mirror of https://github.com/k3s-io/k3s
Merge pull request #61329 from Lion-Wei/ipvs-esipp
Automatic merge from submit-queue. If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>. fix externaltrafficpolicy=local related ipvs ci case **What this PR does / why we need it**: **Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*: Fixes #61328 **Special notes for your reviewer**: To realize externalTrafficPolicy=local, but do not affect traffic inside the cluster. If thie pr got merged, the iptables rules of ipvs proxy mode ESIPP should be like(for loadbalance case): ``` Chain KUBE-FIRE-WALL (1 references) target prot opt source destination ACCEPT all -- 0.0.0.0/0 0.0.0.0/0 match-set KUBE-LB-INGRESS-LOCAL dst,dst KUBE-MARK-MASQ all -- 0.0.0.0/0 0.0.0.0/0 /* mark MASQ for external traffic policy not local */ Chain KUBE-MARK-DROP (0 references) target prot opt source destination MARK all -- 0.0.0.0/0 0.0.0.0/0 MARK or 0x8000 Chain KUBE-MARK-MASQ (3 references) target prot opt source destination MARK all -- 0.0.0.0/0 0.0.0.0/0 MARK or 0x4000 Chain KUBE-NODE-PORT (1 references) target prot opt source destination ACCEPT all -- 0.0.0.0/0 0.0.0.0/0 match-set KUBE-NODE-PORT-LOCAL-TCP dst KUBE-MARK-MASQ all -- 0.0.0.0/0 0.0.0.0/0 /* mark MASQ for external traffic policy not local */ Chain KUBE-POSTROUTING (0 references) target prot opt source destination MASQUERADE all -- 0.0.0.0/0 0.0.0.0/0 /* kubernetes service traffic requiring SNAT */ mark match 0x4000/0x4000 MASQUERADE all -- 0.0.0.0/0 0.0.0.0/0 match-set KUBE-LOOP-BACK dst,dst,src Chain KUBE-SERVICES (2 references) target prot opt source destination KUBE-MARK-MASQ all -- !10.64.0.0/14 0.0.0.0/0 match-set KUBE-CLUSTER-IP dst,dst KUBE-FIRE-WALL all -- 0.0.0.0/0 0.0.0.0/0 match-set KUBE-LOAD-BALANCER-MASQ dst,dst KUBE-NODE-PORT tcp -- 0.0.0.0/0 0.0.0.0/0 tcp match-set KUBE-NODE-PORT-TCP dst ``` **Release note**: ```release-note NONE ```pull/8/head
commit
9bd6c62a36
|
@ -40,8 +40,8 @@ const (
|
|||
// KubeLoadBalancerSet is used to store service load balancer ingress ip + port, it is the service lb portal.
|
||||
KubeLoadBalancerSet = "KUBE-LOAD-BALANCER"
|
||||
|
||||
// KubeLoadBalancerMasqSet is used to store service load balancer ingress ip + port for masquerade purpose.
|
||||
KubeLoadBalancerMasqSet = "KUBE-LOAD-BALANCER-MASQ"
|
||||
// KubeLoadBalancerIngressLocalSet is used to store service load balancer ingress ip + port with externalTrafficPolicy=local.
|
||||
KubeLoadBalancerIngressLocalSet = "KUBE-LB-INGRESS-LOCAL"
|
||||
|
||||
// KubeLoadBalancerSourceIPSet is used to store service load balancer ingress ip + port + source IP for packet filter purpose.
|
||||
KubeLoadBalancerSourceIPSet = "KUBE-LOAD-BALANCER-SOURCE-IP"
|
||||
|
@ -49,11 +49,17 @@ const (
|
|||
// KubeLoadBalancerSourceCIDRSet is used to store service load balancer ingress ip + port + source cidr for packet filter purpose.
|
||||
KubeLoadBalancerSourceCIDRSet = "KUBE-LOAD-BALANCER-SOURCE-CIDR"
|
||||
|
||||
// KubeNodePortSetTCP is used to store nodeport TCP port for masquerade purpose.
|
||||
// KubeNodePortSetTCP is used to store the nodeport TCP port for masquerade purpose.
|
||||
KubeNodePortSetTCP = "KUBE-NODE-PORT-TCP"
|
||||
|
||||
// KubeNodePortSetUDP is used to store nodeport UDP port for masquerade purpose.
|
||||
// KubeNodePortLocalSetTCP is used to store the nodeport TCP port with externalTrafficPolicy=local.
|
||||
KubeNodePortLocalSetTCP = "KUBE-NODE-PORT-LOCAL-TCP"
|
||||
|
||||
// KubeNodePortSetUDP is used to store the nodeport UDP port for masquerade purpose.
|
||||
KubeNodePortSetUDP = "KUBE-NODE-PORT-UDP"
|
||||
|
||||
// KubeNodePortLocalSetUDP is used to store the nodeport UDP port with externalTrafficPolicy=local.
|
||||
KubeNodePortLocalSetUDP = "KUBE-NODE-PORT-LOCAL-UDP"
|
||||
)
|
||||
|
||||
// IPSetVersioner can query the current ipset version.
|
||||
|
|
|
@ -68,13 +68,16 @@ const (
|
|||
// KubeMarkMasqChain is the mark-for-masquerade chain
|
||||
KubeMarkMasqChain utiliptables.Chain = "KUBE-MARK-MASQ"
|
||||
|
||||
// KubeNodePortChain is the kubernetes node port chain
|
||||
KubeNodePortChain utiliptables.Chain = "KUBE-NODE-PORT"
|
||||
|
||||
// KubeMarkDropChain is the mark-for-drop chain
|
||||
KubeMarkDropChain utiliptables.Chain = "KUBE-MARK-DROP"
|
||||
|
||||
// DefaultScheduler is the default ipvs scheduler algorithm - round robin.
|
||||
DefaultScheduler = "rr"
|
||||
|
||||
// DefaultDummyDevice is the default dummy interface where ipvs service address will bind to it.
|
||||
// DefaultDummyDevice is the default dummy interface which ipvs service address will bind to it.
|
||||
DefaultDummyDevice = "kube-ipvs0"
|
||||
)
|
||||
|
||||
|
@ -149,27 +152,31 @@ type Proxier struct {
|
|||
natRules *bytes.Buffer
|
||||
// Added as a member to the struct to allow injection for testing.
|
||||
netlinkHandle NetLinkHandle
|
||||
// loopbackSet is the ipset where stores all endpoints IP:Port,IP for solving hairpin mode purpose.
|
||||
// loopbackSet is the ipset which stores all endpoints IP:Port,IP for solving hairpin mode purpose.
|
||||
loopbackSet *IPSet
|
||||
// clusterIPSet is the ipset where stores all service ClusterIP:Port
|
||||
// clusterIPSet is the ipset which stores all service ClusterIP:Port
|
||||
clusterIPSet *IPSet
|
||||
// nodePortSetTCP is the bitmap:port type ipset where stores all TCP node port
|
||||
// nodePortSetTCP is the bitmap:port type ipset which stores all TCP node port
|
||||
nodePortSetTCP *IPSet
|
||||
// nodePortSetTCP is the bitmap:port type ipset where stores all UDP node port
|
||||
// nodePortSetTCP is the bitmap:port type ipset which stores all UDP node port
|
||||
nodePortSetUDP *IPSet
|
||||
// externalIPSet is the hash:ip,port type ipset where stores all service ExternalIP:Port
|
||||
// lbIngressLocalSet is the hash:ip type ipset which stores all service ip's with externaltrafficPolicy=local
|
||||
lbIngressLocalSet *IPSet
|
||||
// nodePortLocalSetTCP is the bitmap:port type ipset which stores all TCP nodeport's with externaltrafficPolicy=local
|
||||
nodePortLocalSetTCP *IPSet
|
||||
// nodePortLocalSetUDP is the bitmap:port type ipset which stores all UDP nodeport's with externaltrafficPolicy=local
|
||||
nodePortLocalSetUDP *IPSet
|
||||
// externalIPSet is the hash:ip,port type ipset which stores all service ExternalIP:Port
|
||||
externalIPSet *IPSet
|
||||
// lbIngressSet is the hash:ip,port type ipset where stores all service load balancer ingress IP:Port.
|
||||
// lbIngressSet is the hash:ip,port type ipset which stores all service load balancer ingress IP:Port.
|
||||
lbIngressSet *IPSet
|
||||
// lbMasqSet is the hash:ip,port type ipset where stores all service load balancer ingress IP:Port which needs masquerade.
|
||||
lbMasqSet *IPSet
|
||||
// lbWhiteListIPSet is the hash:ip,port,ip type ipset where stores all service load balancer ingress IP:Port,sourceIP pair, any packets
|
||||
// lbWhiteListIPSet is the hash:ip,port,ip type ipset which stores all service load balancer ingress IP:Port,sourceIP pair, any packets
|
||||
// with the source IP visit ingress IP:Port can pass through.
|
||||
lbWhiteListIPSet *IPSet
|
||||
// lbWhiteListIPSet is the hash:ip,port,net type ipset where stores all service load balancer ingress IP:Port,sourceCIDR pair, any packets
|
||||
// lbWhiteListIPSet is the hash:ip,port,net type ipset which stores all service load balancer ingress IP:Port,sourceCIDR pair, any packets
|
||||
// from the source CIDR visit ingress IP:Port can pass through.
|
||||
lbWhiteListCIDRSet *IPSet
|
||||
// Values are as a parameter to select the interfaces where nodeport works.
|
||||
// Values are as a parameter to select the interfaces which nodeport works.
|
||||
nodePortAddresses []string
|
||||
// networkInterfacer defines an interface for several net library functions.
|
||||
// Inject for test purpose.
|
||||
|
@ -308,43 +315,45 @@ func NewProxier(ipt utiliptables.Interface,
|
|||
healthChecker := healthcheck.NewServer(hostname, recorder, nil, nil) // use default implementations of deps
|
||||
|
||||
proxier := &Proxier{
|
||||
portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable),
|
||||
serviceMap: make(proxy.ServiceMap),
|
||||
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, &isIPv6, recorder),
|
||||
endpointsMap: make(proxy.EndpointsMap),
|
||||
endpointsChanges: proxy.NewEndpointChangeTracker(hostname, nil, &isIPv6, recorder),
|
||||
syncPeriod: syncPeriod,
|
||||
minSyncPeriod: minSyncPeriod,
|
||||
iptables: ipt,
|
||||
masqueradeAll: masqueradeAll,
|
||||
masqueradeMark: masqueradeMark,
|
||||
exec: exec,
|
||||
clusterCIDR: clusterCIDR,
|
||||
hostname: hostname,
|
||||
nodeIP: nodeIP,
|
||||
portMapper: &listenPortOpener{},
|
||||
recorder: recorder,
|
||||
healthChecker: healthChecker,
|
||||
healthzServer: healthzServer,
|
||||
ipvs: ipvs,
|
||||
ipvsScheduler: scheduler,
|
||||
ipGetter: &realIPGetter{nl: NewNetLinkHandle()},
|
||||
iptablesData: bytes.NewBuffer(nil),
|
||||
natChains: bytes.NewBuffer(nil),
|
||||
natRules: bytes.NewBuffer(nil),
|
||||
netlinkHandle: NewNetLinkHandle(),
|
||||
ipset: ipset,
|
||||
loopbackSet: NewIPSet(ipset, KubeLoopBackIPSet, utilipset.HashIPPortIP, isIPv6),
|
||||
clusterIPSet: NewIPSet(ipset, KubeClusterIPSet, utilipset.HashIPPort, isIPv6),
|
||||
externalIPSet: NewIPSet(ipset, KubeExternalIPSet, utilipset.HashIPPort, isIPv6),
|
||||
lbIngressSet: NewIPSet(ipset, KubeLoadBalancerSet, utilipset.HashIPPort, isIPv6),
|
||||
lbMasqSet: NewIPSet(ipset, KubeLoadBalancerMasqSet, utilipset.HashIPPort, isIPv6),
|
||||
lbWhiteListIPSet: NewIPSet(ipset, KubeLoadBalancerSourceIPSet, utilipset.HashIPPortIP, isIPv6),
|
||||
lbWhiteListCIDRSet: NewIPSet(ipset, KubeLoadBalancerSourceCIDRSet, utilipset.HashIPPortNet, isIPv6),
|
||||
nodePortSetTCP: NewIPSet(ipset, KubeNodePortSetTCP, utilipset.BitmapPort, false),
|
||||
nodePortSetUDP: NewIPSet(ipset, KubeNodePortSetUDP, utilipset.BitmapPort, false),
|
||||
nodePortAddresses: nodePortAddresses,
|
||||
networkInterfacer: utilproxy.RealNetwork{},
|
||||
portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable),
|
||||
serviceMap: make(proxy.ServiceMap),
|
||||
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, &isIPv6, recorder),
|
||||
endpointsMap: make(proxy.EndpointsMap),
|
||||
endpointsChanges: proxy.NewEndpointChangeTracker(hostname, nil, &isIPv6, recorder),
|
||||
syncPeriod: syncPeriod,
|
||||
minSyncPeriod: minSyncPeriod,
|
||||
iptables: ipt,
|
||||
masqueradeAll: masqueradeAll,
|
||||
masqueradeMark: masqueradeMark,
|
||||
exec: exec,
|
||||
clusterCIDR: clusterCIDR,
|
||||
hostname: hostname,
|
||||
nodeIP: nodeIP,
|
||||
portMapper: &listenPortOpener{},
|
||||
recorder: recorder,
|
||||
healthChecker: healthChecker,
|
||||
healthzServer: healthzServer,
|
||||
ipvs: ipvs,
|
||||
ipvsScheduler: scheduler,
|
||||
ipGetter: &realIPGetter{nl: NewNetLinkHandle()},
|
||||
iptablesData: bytes.NewBuffer(nil),
|
||||
natChains: bytes.NewBuffer(nil),
|
||||
natRules: bytes.NewBuffer(nil),
|
||||
netlinkHandle: NewNetLinkHandle(),
|
||||
ipset: ipset,
|
||||
loopbackSet: NewIPSet(ipset, KubeLoopBackIPSet, utilipset.HashIPPortIP, isIPv6),
|
||||
clusterIPSet: NewIPSet(ipset, KubeClusterIPSet, utilipset.HashIPPort, isIPv6),
|
||||
externalIPSet: NewIPSet(ipset, KubeExternalIPSet, utilipset.HashIPPort, isIPv6),
|
||||
lbIngressSet: NewIPSet(ipset, KubeLoadBalancerSet, utilipset.HashIPPort, isIPv6),
|
||||
lbIngressLocalSet: NewIPSet(ipset, KubeLoadBalancerIngressLocalSet, utilipset.HashIPPort, isIPv6),
|
||||
lbWhiteListIPSet: NewIPSet(ipset, KubeLoadBalancerSourceIPSet, utilipset.HashIPPortIP, isIPv6),
|
||||
lbWhiteListCIDRSet: NewIPSet(ipset, KubeLoadBalancerSourceCIDRSet, utilipset.HashIPPortNet, isIPv6),
|
||||
nodePortSetTCP: NewIPSet(ipset, KubeNodePortSetTCP, utilipset.BitmapPort, false),
|
||||
nodePortLocalSetTCP: NewIPSet(ipset, KubeNodePortLocalSetTCP, utilipset.BitmapPort, false),
|
||||
nodePortSetUDP: NewIPSet(ipset, KubeNodePortSetUDP, utilipset.BitmapPort, false),
|
||||
nodePortLocalSetUDP: NewIPSet(ipset, KubeNodePortLocalSetUDP, utilipset.BitmapPort, false),
|
||||
nodePortAddresses: nodePortAddresses,
|
||||
networkInterfacer: utilproxy.RealNetwork{},
|
||||
}
|
||||
burstSyncs := 2
|
||||
glog.V(3).Infof("minSyncPeriod: %v, syncPeriod: %v, burstSyncs: %d", minSyncPeriod, syncPeriod, burstSyncs)
|
||||
|
@ -511,7 +520,8 @@ func CleanupLeftovers(ipvs utilipvs.Interface, ipt utiliptables.Interface, ipset
|
|||
// Destroy ip sets created by ipvs Proxier. We should call it after cleaning up
|
||||
// iptables since we can NOT delete ip set which is still referenced by iptables.
|
||||
ipSetsToDestroy := []string{KubeLoopBackIPSet, KubeClusterIPSet, KubeLoadBalancerSet, KubeNodePortSetTCP, KubeNodePortSetUDP,
|
||||
KubeExternalIPSet, KubeLoadBalancerSourceIPSet, KubeLoadBalancerSourceCIDRSet, KubeLoadBalancerMasqSet}
|
||||
KubeExternalIPSet, KubeLoadBalancerSourceIPSet, KubeLoadBalancerSourceCIDRSet,
|
||||
KubeLoadBalancerIngressLocalSet, KubeNodePortLocalSetUDP, KubeNodePortLocalSetTCP}
|
||||
for _, set := range ipSetsToDestroy {
|
||||
err = ipset.DestroySet(set)
|
||||
if err != nil {
|
||||
|
@ -699,7 +709,8 @@ func (proxier *Proxier) syncProxyRules() {
|
|||
|
||||
// make sure ip sets exists in the system.
|
||||
ipSets := []*IPSet{proxier.loopbackSet, proxier.clusterIPSet, proxier.externalIPSet, proxier.nodePortSetUDP, proxier.nodePortSetTCP,
|
||||
proxier.lbIngressSet, proxier.lbMasqSet, proxier.lbWhiteListCIDRSet, proxier.lbWhiteListIPSet}
|
||||
proxier.lbIngressSet, proxier.lbWhiteListCIDRSet, proxier.lbWhiteListIPSet, proxier.lbIngressLocalSet,
|
||||
proxier.nodePortLocalSetTCP, proxier.nodePortLocalSetUDP}
|
||||
if err := ensureIPSets(ipSets...); err != nil {
|
||||
return
|
||||
}
|
||||
|
@ -731,12 +742,18 @@ func (proxier *Proxier) syncProxyRules() {
|
|||
glog.Errorf("Failed to link KUBE-SERVICES chain: %v", err)
|
||||
return
|
||||
}
|
||||
// Kube service ipset
|
||||
if err := proxier.createKubeFireWallChain(existingNATChains, proxier.natChains); err != nil {
|
||||
// `iptables -t nat -N KUBE-FIRE-WALL`
|
||||
if err := proxier.createKubeChain(existingNATChains, KubeFireWallChain); err != nil {
|
||||
glog.Errorf("Failed to create KUBE-FIRE-WALL chain: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
// `iptables -t nat -N KUBE-NODE-PORT`
|
||||
if err := proxier.createKubeChain(existingNATChains, KubeNodePortChain); err != nil {
|
||||
glog.Errorf("Failed to create KUBE-NODE-PORT chain: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Build IPVS rules for each service.
|
||||
for svcName, svc := range proxier.serviceMap {
|
||||
svcInfo, ok := svc.(*serviceInfo)
|
||||
|
@ -901,23 +918,23 @@ func (proxier *Proxier) syncProxyRules() {
|
|||
// proxier.kubeServiceAccessSet.activeEntries.Insert(entry.String())
|
||||
// If we are proxying globally, we need to masquerade in case we cross nodes.
|
||||
// If we are proxying only locally, we can retain the source IP.
|
||||
if !svcInfo.OnlyNodeLocalEndpoints {
|
||||
if valid := proxier.lbMasqSet.validateEntry(entry); !valid {
|
||||
glog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.lbMasqSet.Name))
|
||||
if valid := proxier.lbIngressSet.validateEntry(entry); !valid {
|
||||
glog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.lbIngressSet.Name))
|
||||
continue
|
||||
}
|
||||
proxier.lbIngressSet.activeEntries.Insert(entry.String())
|
||||
// insert loadbalancer entry to lbIngressLocalSet if service externaltrafficpolicy=local
|
||||
if svcInfo.OnlyNodeLocalEndpoints {
|
||||
if valid := proxier.lbIngressLocalSet.validateEntry(entry); !valid {
|
||||
glog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.lbIngressSet.Name))
|
||||
continue
|
||||
}
|
||||
proxier.lbMasqSet.activeEntries.Insert(entry.String())
|
||||
proxier.lbIngressLocalSet.activeEntries.Insert(entry.String())
|
||||
}
|
||||
if len(svcInfo.LoadBalancerSourceRanges) != 0 {
|
||||
// The service firewall rules are created based on ServiceSpec.loadBalancerSourceRanges field.
|
||||
// This currently works for loadbalancers that preserves source ips.
|
||||
// For loadbalancers which direct traffic to service NodePort, the firewall rules will not apply.
|
||||
if valid := proxier.lbIngressSet.validateEntry(entry); !valid {
|
||||
glog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.lbIngressSet.Name))
|
||||
continue
|
||||
}
|
||||
proxier.lbIngressSet.activeEntries.Insert(entry.String())
|
||||
|
||||
allowFromNode := false
|
||||
for _, src := range svcInfo.LoadBalancerSourceRanges {
|
||||
// ipset call
|
||||
|
@ -1009,30 +1026,48 @@ func (proxier *Proxier) syncProxyRules() {
|
|||
|
||||
// Nodeports need SNAT, unless they're local.
|
||||
// ipset call
|
||||
if !svcInfo.OnlyNodeLocalEndpoints {
|
||||
entry = &utilipset.Entry{
|
||||
// No need to provide ip info
|
||||
Port: svcInfo.NodePort,
|
||||
Protocol: protocol,
|
||||
SetType: utilipset.BitmapPort,
|
||||
entry = &utilipset.Entry{
|
||||
// No need to provide ip info
|
||||
Port: svcInfo.NodePort,
|
||||
Protocol: protocol,
|
||||
SetType: utilipset.BitmapPort,
|
||||
}
|
||||
var nodePortSet *IPSet
|
||||
switch protocol {
|
||||
case "tcp":
|
||||
nodePortSet = proxier.nodePortSetTCP
|
||||
case "udp":
|
||||
nodePortSet = proxier.nodePortSetUDP
|
||||
default:
|
||||
// It should never hit
|
||||
glog.Errorf("Unsupported protocol type: %s", protocol)
|
||||
}
|
||||
if nodePortSet != nil {
|
||||
if valid := nodePortSet.validateEntry(entry); !valid {
|
||||
glog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, nodePortSet.Name))
|
||||
continue
|
||||
}
|
||||
var nodePortSet *IPSet
|
||||
switch protocol {
|
||||
nodePortSet.activeEntries.Insert(entry.String())
|
||||
}
|
||||
|
||||
// Add externaltrafficpolicy=local type nodeport entry
|
||||
if svcInfo.OnlyNodeLocalEndpoints {
|
||||
var nodePortLocalSet *IPSet
|
||||
switch protocol {
|
||||
case "tcp":
|
||||
nodePortSet = proxier.nodePortSetTCP
|
||||
nodePortLocalSet = proxier.nodePortLocalSetTCP
|
||||
case "udp":
|
||||
nodePortSet = proxier.nodePortSetUDP
|
||||
nodePortLocalSet = proxier.nodePortLocalSetUDP
|
||||
default:
|
||||
// It should never hit
|
||||
glog.Errorf("Unsupported protocol type: %s", protocol)
|
||||
}
|
||||
if nodePortSet != nil {
|
||||
if valid := nodePortSet.validateEntry(entry); !valid {
|
||||
glog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, nodePortSet.Name))
|
||||
if nodePortLocalSet != nil {
|
||||
if valid := nodePortLocalSet.validateEntry(entry); !valid {
|
||||
glog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, nodePortLocalSet.Name))
|
||||
continue
|
||||
}
|
||||
nodePortSet.activeEntries.Insert(entry.String())
|
||||
nodePortLocalSet.activeEntries.Insert(entry.String())
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1080,8 +1115,9 @@ func (proxier *Proxier) syncProxyRules() {
|
|||
}
|
||||
|
||||
// sync ipset entries
|
||||
ipsetsToSync := []*IPSet{proxier.loopbackSet, proxier.clusterIPSet, proxier.lbIngressSet, proxier.lbMasqSet, proxier.nodePortSetTCP,
|
||||
proxier.nodePortSetUDP, proxier.externalIPSet, proxier.lbWhiteListIPSet, proxier.lbWhiteListCIDRSet}
|
||||
ipsetsToSync := []*IPSet{proxier.loopbackSet, proxier.clusterIPSet, proxier.lbIngressSet, proxier.nodePortSetTCP,
|
||||
proxier.nodePortSetUDP, proxier.externalIPSet, proxier.lbWhiteListIPSet, proxier.lbWhiteListCIDRSet, proxier.lbIngressLocalSet,
|
||||
proxier.nodePortLocalSetTCP, proxier.nodePortLocalSetUDP}
|
||||
for i := range ipsetsToSync {
|
||||
ipsetsToSync[i].syncIPSetEntries()
|
||||
}
|
||||
|
@ -1134,46 +1170,55 @@ func (proxier *Proxier) syncProxyRules() {
|
|||
// This covers cases like GCE load-balancers which get added to the local routing table.
|
||||
writeLine(proxier.natRules, append(dstLocalOnlyArgs, "-j", "ACCEPT")...)
|
||||
}
|
||||
if !proxier.lbMasqSet.isEmpty() {
|
||||
if !proxier.lbIngressSet.isEmpty() {
|
||||
// Build masquerade rules for packets which cross node visit load balancer ingress IPs.
|
||||
args = append(args[:0],
|
||||
"-A", string(kubeServicesChain),
|
||||
"-m", "set", "--match-set", proxier.lbMasqSet.Name,
|
||||
"dst,dst",
|
||||
)
|
||||
writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
|
||||
}
|
||||
if !proxier.lbWhiteListCIDRSet.isEmpty() || !proxier.lbWhiteListIPSet.isEmpty() {
|
||||
// link kube-services chain -> kube-fire-wall chain
|
||||
args := []string{
|
||||
"-A", string(kubeServicesChain),
|
||||
"-m", "set", "--match-set", proxier.lbIngressSet.Name,
|
||||
"dst,dst",
|
||||
"-j", string(KubeFireWallChain),
|
||||
}
|
||||
writeLine(proxier.natRules, args...)
|
||||
if !proxier.lbWhiteListCIDRSet.isEmpty() {
|
||||
)
|
||||
writeLine(proxier.natRules, append(args, "-j", string(KubeFireWallChain))...)
|
||||
// Don't masq for service with externaltrafficpolicy =local
|
||||
if !proxier.lbIngressLocalSet.isEmpty() {
|
||||
args = append(args[:0],
|
||||
"-A", string(KubeFireWallChain),
|
||||
"-m", "set", "--match-set", proxier.lbWhiteListCIDRSet.Name,
|
||||
"dst,dst,src",
|
||||
)
|
||||
writeLine(proxier.natRules, append(args, "-j", "ACCEPT")...)
|
||||
}
|
||||
if !proxier.lbWhiteListIPSet.isEmpty() {
|
||||
args = append(args[:0],
|
||||
"-A", string(KubeFireWallChain),
|
||||
"-m", "set", "--match-set", proxier.lbWhiteListIPSet.Name,
|
||||
"dst,dst,src",
|
||||
"-m", "set", "--match-set", proxier.lbIngressLocalSet.Name,
|
||||
"dst,dst",
|
||||
)
|
||||
writeLine(proxier.natRules, append(args, "-j", "ACCEPT")...)
|
||||
}
|
||||
// mark masq for others
|
||||
args = append(args[:0],
|
||||
"-A", string(KubeFireWallChain),
|
||||
"-m", "comment", "--comment",
|
||||
fmt.Sprintf(`"mark MASQ for external traffic policy not local"`),
|
||||
)
|
||||
// If the packet was able to reach the end of firewall chain, then it did not get DNATed.
|
||||
// It means the packet cannot go thru the firewall, then mark it for DROP
|
||||
writeLine(proxier.natRules, append(args, "-j", string(KubeMarkDropChain))...)
|
||||
writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
|
||||
// if have whitelist, accept or drop.
|
||||
if !proxier.lbWhiteListCIDRSet.isEmpty() || !proxier.lbWhiteListIPSet.isEmpty() {
|
||||
if !proxier.lbWhiteListCIDRSet.isEmpty() {
|
||||
args = append(args[:0],
|
||||
"-A", string(KubeFireWallChain),
|
||||
"-m", "set", "--match-set", proxier.lbWhiteListCIDRSet.Name,
|
||||
"dst,dst,src",
|
||||
)
|
||||
writeLine(proxier.natRules, append(args, "-j", "ACCEPT")...)
|
||||
}
|
||||
if !proxier.lbWhiteListIPSet.isEmpty() {
|
||||
args = append(args[:0],
|
||||
"-A", string(KubeFireWallChain),
|
||||
"-m", "set", "--match-set", proxier.lbWhiteListIPSet.Name,
|
||||
"dst,dst,src",
|
||||
)
|
||||
writeLine(proxier.natRules, append(args, "-j", "ACCEPT")...)
|
||||
}
|
||||
args = append(args[:0],
|
||||
"-A", string(KubeFireWallChain),
|
||||
)
|
||||
// If the packet was able to reach the end of firewall chain, then it did not get DNATed.
|
||||
// It means the packet cannot go thru the firewall, then mark it for DROP
|
||||
writeLine(proxier.natRules, append(args, "-j", string(KubeMarkDropChain))...)
|
||||
}
|
||||
}
|
||||
if !proxier.nodePortSetTCP.isEmpty() {
|
||||
// Build masquerade rules for packets which cross node visit nodeport.
|
||||
|
@ -1183,15 +1228,47 @@ func (proxier *Proxier) syncProxyRules() {
|
|||
"-m", "set", "--match-set", proxier.nodePortSetTCP.Name,
|
||||
"dst",
|
||||
)
|
||||
writeLine(proxier.natRules, append(args, "-j", string(KubeNodePortChain))...)
|
||||
// accept for nodeports w/ externaltrafficpolicy=local
|
||||
if !proxier.nodePortLocalSetTCP.isEmpty() {
|
||||
args = append(args[:0],
|
||||
"-A", string(KubeNodePortChain),
|
||||
"-m", "set", "--match-set", proxier.nodePortLocalSetTCP.Name,
|
||||
"dst",
|
||||
)
|
||||
writeLine(proxier.natRules, append(args, "-j", "ACCEPT")...)
|
||||
}
|
||||
// mark masq for others
|
||||
args = append(args[:0],
|
||||
"-A", string(KubeNodePortChain),
|
||||
"-m", "comment", "--comment",
|
||||
fmt.Sprintf(`"mark MASQ for externaltrafficpolicy=cluster"`),
|
||||
)
|
||||
writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
|
||||
}
|
||||
if !proxier.nodePortSetUDP.isEmpty() {
|
||||
// accept for nodeports w/ externaltrafficpolicy=local
|
||||
args = append(args[:0],
|
||||
"-A", string(kubeServicesChain),
|
||||
"-m", "udp", "-p", "udp",
|
||||
"-m", "set", "--match-set", proxier.nodePortSetUDP.Name,
|
||||
"dst",
|
||||
)
|
||||
writeLine(proxier.natRules, append(args, "-j", string(KubeNodePortChain))...)
|
||||
if !proxier.nodePortLocalSetUDP.isEmpty() {
|
||||
args = append(args[:0],
|
||||
"-A", string(KubeNodePortChain),
|
||||
"-m", "set", "--match-set", proxier.nodePortLocalSetUDP.Name,
|
||||
"dst",
|
||||
)
|
||||
writeLine(proxier.natRules, append(args, "-j", "ACCEPT")...)
|
||||
}
|
||||
// mark masq for others
|
||||
args = append(args[:0],
|
||||
"-A", string(KubeNodePortChain),
|
||||
"-m", "comment", "--comment",
|
||||
fmt.Sprintf(`"mark MASQ for externaltrafficpolicy=cluster"`),
|
||||
)
|
||||
writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
|
||||
}
|
||||
|
||||
|
@ -1327,9 +1404,7 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode
|
|||
}
|
||||
|
||||
for _, epInfo := range proxier.endpointsMap[svcPortName] {
|
||||
if !onlyNodeLocalEndpoints || onlyNodeLocalEndpoints && epInfo.GetIsLocal() {
|
||||
newEndpoints.Insert(epInfo.String())
|
||||
}
|
||||
newEndpoints.Insert(epInfo.String())
|
||||
}
|
||||
|
||||
if !curEndpoints.Equal(newEndpoints) {
|
||||
|
@ -1439,17 +1514,17 @@ func (proxier *Proxier) linkKubeServiceChain(existingNATChains map[utiliptables.
|
|||
return nil
|
||||
}
|
||||
|
||||
func (proxier *Proxier) createKubeFireWallChain(existingNATChains map[utiliptables.Chain]string, natChains *bytes.Buffer) error {
|
||||
// `iptables -t nat -N KUBE-FIRE-WALL`
|
||||
if _, err := proxier.iptables.EnsureChain(utiliptables.TableNAT, KubeFireWallChain); err != nil {
|
||||
return fmt.Errorf("Failed to ensure that %s chain %s exists: %v", utiliptables.TableNAT, KubeFireWallChain, err)
|
||||
// `iptables -t nat -N <chainName>`
|
||||
func (proxier *Proxier) createKubeChain(existingNATChains map[utiliptables.Chain]string, chainName utiliptables.Chain) error {
|
||||
if _, err := proxier.iptables.EnsureChain(utiliptables.TableNAT, chainName); err != nil {
|
||||
return fmt.Errorf("Failed to ensure that %s chain %s exists: %v", utiliptables.TableNAT, chainName, err)
|
||||
}
|
||||
|
||||
// write `:KUBE-FIRE-WALL - [0:0]` in nat table
|
||||
if chain, ok := existingNATChains[KubeFireWallChain]; ok {
|
||||
writeLine(natChains, chain)
|
||||
// write `:<chainName> - [0:0]` in nat table
|
||||
if chain, ok := existingNATChains[chainName]; ok {
|
||||
writeLine(proxier.natChains, chain)
|
||||
} else {
|
||||
writeLine(natChains, utiliptables.MakeChainLine(KubeFireWallChain))
|
||||
writeLine(proxier.natChains, utiliptables.MakeChainLine(chainName))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -120,36 +120,38 @@ func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset u
|
|||
LookPathFunc: func(cmd string) (string, error) { return cmd, nil },
|
||||
}
|
||||
return &Proxier{
|
||||
exec: fexec,
|
||||
serviceMap: make(proxy.ServiceMap),
|
||||
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, nil, nil),
|
||||
endpointsMap: make(proxy.EndpointsMap),
|
||||
endpointsChanges: proxy.NewEndpointChangeTracker(testHostname, nil, nil, nil),
|
||||
iptables: ipt,
|
||||
ipvs: ipvs,
|
||||
ipset: ipset,
|
||||
clusterCIDR: "10.0.0.0/24",
|
||||
hostname: testHostname,
|
||||
portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable),
|
||||
portMapper: &fakePortOpener{[]*utilproxy.LocalPort{}},
|
||||
healthChecker: newFakeHealthChecker(),
|
||||
ipvsScheduler: DefaultScheduler,
|
||||
ipGetter: &fakeIPGetter{nodeIPs: nodeIPs},
|
||||
iptablesData: bytes.NewBuffer(nil),
|
||||
natChains: bytes.NewBuffer(nil),
|
||||
natRules: bytes.NewBuffer(nil),
|
||||
netlinkHandle: netlinktest.NewFakeNetlinkHandle(),
|
||||
loopbackSet: NewIPSet(ipset, KubeLoopBackIPSet, utilipset.HashIPPortIP, false),
|
||||
clusterIPSet: NewIPSet(ipset, KubeClusterIPSet, utilipset.HashIPPort, false),
|
||||
externalIPSet: NewIPSet(ipset, KubeExternalIPSet, utilipset.HashIPPort, false),
|
||||
lbIngressSet: NewIPSet(ipset, KubeLoadBalancerSet, utilipset.HashIPPort, false),
|
||||
lbMasqSet: NewIPSet(ipset, KubeLoadBalancerMasqSet, utilipset.HashIPPort, false),
|
||||
lbWhiteListIPSet: NewIPSet(ipset, KubeLoadBalancerSourceIPSet, utilipset.HashIPPortIP, false),
|
||||
lbWhiteListCIDRSet: NewIPSet(ipset, KubeLoadBalancerSourceCIDRSet, utilipset.HashIPPortNet, false),
|
||||
nodePortSetTCP: NewIPSet(ipset, KubeNodePortSetTCP, utilipset.BitmapPort, false),
|
||||
nodePortSetUDP: NewIPSet(ipset, KubeNodePortSetUDP, utilipset.BitmapPort, false),
|
||||
nodePortAddresses: make([]string, 0),
|
||||
networkInterfacer: proxyutiltest.NewFakeNetwork(),
|
||||
exec: fexec,
|
||||
serviceMap: make(proxy.ServiceMap),
|
||||
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, nil, nil),
|
||||
endpointsMap: make(proxy.EndpointsMap),
|
||||
endpointsChanges: proxy.NewEndpointChangeTracker(testHostname, nil, nil, nil),
|
||||
iptables: ipt,
|
||||
ipvs: ipvs,
|
||||
ipset: ipset,
|
||||
clusterCIDR: "10.0.0.0/24",
|
||||
hostname: testHostname,
|
||||
portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable),
|
||||
portMapper: &fakePortOpener{[]*utilproxy.LocalPort{}},
|
||||
healthChecker: newFakeHealthChecker(),
|
||||
ipvsScheduler: DefaultScheduler,
|
||||
ipGetter: &fakeIPGetter{nodeIPs: nodeIPs},
|
||||
iptablesData: bytes.NewBuffer(nil),
|
||||
natChains: bytes.NewBuffer(nil),
|
||||
natRules: bytes.NewBuffer(nil),
|
||||
netlinkHandle: netlinktest.NewFakeNetlinkHandle(),
|
||||
loopbackSet: NewIPSet(ipset, KubeLoopBackIPSet, utilipset.HashIPPortIP, false),
|
||||
clusterIPSet: NewIPSet(ipset, KubeClusterIPSet, utilipset.HashIPPort, false),
|
||||
externalIPSet: NewIPSet(ipset, KubeExternalIPSet, utilipset.HashIPPort, false),
|
||||
lbIngressSet: NewIPSet(ipset, KubeLoadBalancerSet, utilipset.HashIPPort, false),
|
||||
lbIngressLocalSet: NewIPSet(ipset, KubeLoadBalancerIngressLocalSet, utilipset.HashIPPort, false),
|
||||
lbWhiteListIPSet: NewIPSet(ipset, KubeLoadBalancerSourceIPSet, utilipset.HashIPPortIP, false),
|
||||
lbWhiteListCIDRSet: NewIPSet(ipset, KubeLoadBalancerSourceCIDRSet, utilipset.HashIPPortNet, false),
|
||||
nodePortSetTCP: NewIPSet(ipset, KubeNodePortSetTCP, utilipset.BitmapPort, false),
|
||||
nodePortLocalSetTCP: NewIPSet(ipset, KubeNodePortLocalSetTCP, utilipset.BitmapPort, false),
|
||||
nodePortLocalSetUDP: NewIPSet(ipset, KubeNodePortLocalSetUDP, utilipset.BitmapPort, false),
|
||||
nodePortSetUDP: NewIPSet(ipset, KubeNodePortSetUDP, utilipset.BitmapPort, false),
|
||||
nodePortAddresses: make([]string, 0),
|
||||
networkInterfacer: proxyutiltest.NewFakeNetwork(),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -803,16 +805,10 @@ func TestLoadBalancer(t *testing.T) {
|
|||
fp.syncProxyRules()
|
||||
}
|
||||
|
||||
func strPtr(s string) *string {
|
||||
return &s
|
||||
}
|
||||
|
||||
func TestOnlyLocalNodePorts(t *testing.T) {
|
||||
ipt := iptablestest.NewFake()
|
||||
ipvs := ipvstest.NewFake()
|
||||
ipset := ipsettest.NewFake(testIPSetVersion)
|
||||
nodeIP := net.ParseIP("100.101.102.103")
|
||||
fp := NewFakeProxier(ipt, ipvs, ipset, []net.IP{nodeIP})
|
||||
ipt, fp := buildFakeProxier([]net.IP{nodeIP})
|
||||
|
||||
svcIP := "10.20.30.41"
|
||||
svcPort := 80
|
||||
svcNodePort := 3001
|
||||
|
@ -835,17 +831,13 @@ func TestOnlyLocalNodePorts(t *testing.T) {
|
|||
}),
|
||||
)
|
||||
|
||||
epIP1 := "10.180.0.1"
|
||||
epIP2 := "10.180.2.1"
|
||||
epIP := "10.180.0.1"
|
||||
makeEndpointsMap(fp,
|
||||
makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *api.Endpoints) {
|
||||
ept.Subsets = []api.EndpointSubset{{
|
||||
Addresses: []api.EndpointAddress{{
|
||||
IP: epIP1,
|
||||
IP: epIP,
|
||||
NodeName: nil,
|
||||
}, {
|
||||
IP: epIP2,
|
||||
NodeName: strPtr(testHostname),
|
||||
}},
|
||||
Ports: []api.EndpointPort{{
|
||||
Name: svcPortName.Port,
|
||||
|
@ -866,40 +858,42 @@ func TestOnlyLocalNodePorts(t *testing.T) {
|
|||
fp.syncProxyRules()
|
||||
|
||||
// Expect 3 services and 1 destination
|
||||
services, err := ipvs.GetVirtualServers()
|
||||
if err != nil {
|
||||
t.Errorf("Failed to get ipvs services, err: %v", err)
|
||||
epVS := &netlinktest.ExpectedVirtualServer{
|
||||
VSNum: 3, IP: nodeIP.String(), Port: uint16(svcNodePort), Protocol: string(api.ProtocolTCP),
|
||||
RS: []netlinktest.ExpectedRealServer{{
|
||||
IP: epIP, Port: uint16(svcPort),
|
||||
}}}
|
||||
checkIPVS(t, fp, epVS)
|
||||
|
||||
// check ipSet rules
|
||||
epEntry := &utilipset.Entry{
|
||||
Port: svcNodePort,
|
||||
Protocol: strings.ToLower(string(api.ProtocolTCP)),
|
||||
SetType: utilipset.BitmapPort,
|
||||
}
|
||||
if len(services) != 3 {
|
||||
t.Errorf("Expect 3 ipvs services, got %d", len(services))
|
||||
epIPSet := netlinktest.ExpectedIPSet{
|
||||
KubeNodePortSetTCP: {epEntry},
|
||||
KubeNodePortLocalSetTCP: {epEntry},
|
||||
}
|
||||
found := false
|
||||
for _, svc := range services {
|
||||
if svc.Address.Equal(nodeIP) && svc.Port == uint16(svcNodePort) && svc.Protocol == string(api.ProtocolTCP) {
|
||||
found = true
|
||||
destinations, err := ipvs.GetRealServers(svc)
|
||||
if err != nil {
|
||||
t.Errorf("Failed to get ipvs destinations, err: %v", err)
|
||||
}
|
||||
if len(destinations) != 1 {
|
||||
t.Errorf("Expect 1 ipvs destination, got %d", len(destinations))
|
||||
} else {
|
||||
if destinations[0].Address.String() != epIP2 || destinations[0].Port != uint16(svcPort) {
|
||||
t.Errorf("service Endpoint mismatch ipvs service destination")
|
||||
}
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
t.Errorf("Expect node port type service, got none")
|
||||
checkIPSet(t, fp, epIPSet)
|
||||
|
||||
// Check iptables chain and rules
|
||||
epIpt := netlinktest.ExpectedIptablesChain{
|
||||
string(kubeServicesChain): {{
|
||||
JumpChain: string(KubeNodePortChain), MatchSet: KubeNodePortSetTCP,
|
||||
}},
|
||||
string(KubeNodePortChain): {{
|
||||
JumpChain: "ACCEPT", MatchSet: KubeNodePortLocalSetTCP,
|
||||
}, {
|
||||
JumpChain: string(KubeMarkMasqChain), MatchSet: "",
|
||||
}},
|
||||
}
|
||||
checkIptables(t, ipt, epIpt)
|
||||
}
|
||||
func TestLoadBalanceSourceRanges(t *testing.T) {
|
||||
ipt := iptablestest.NewFake()
|
||||
ipvs := ipvstest.NewFake()
|
||||
ipset := ipsettest.NewFake(testIPSetVersion)
|
||||
fp := NewFakeProxier(ipt, ipvs, ipset, nil)
|
||||
nodeIP := net.ParseIP("100.101.102.103")
|
||||
ipt, fp := buildFakeProxier([]net.IP{nodeIP})
|
||||
|
||||
svcIP := "10.20.30.41"
|
||||
svcPort := 80
|
||||
svcLBIP := "1.2.3.4"
|
||||
|
@ -932,7 +926,7 @@ func TestLoadBalanceSourceRanges(t *testing.T) {
|
|||
ept.Subsets = []api.EndpointSubset{{
|
||||
Addresses: []api.EndpointAddress{{
|
||||
IP: epIP,
|
||||
NodeName: strPtr(testHostname),
|
||||
NodeName: nil,
|
||||
}},
|
||||
Ports: []api.EndpointPort{{
|
||||
Name: svcPortName.Port,
|
||||
|
@ -945,82 +939,48 @@ func TestLoadBalanceSourceRanges(t *testing.T) {
|
|||
fp.syncProxyRules()
|
||||
|
||||
// Check ipvs service and destinations
|
||||
services, err := ipvs.GetVirtualServers()
|
||||
if err != nil {
|
||||
t.Errorf("Failed to get ipvs services, err: %v", err)
|
||||
}
|
||||
found := false
|
||||
for _, svc := range services {
|
||||
fmt.Printf("address: %s:%d, %s", svc.Address.String(), svc.Port, svc.Protocol)
|
||||
if svc.Address.Equal(net.ParseIP(svcLBIP)) && svc.Port == uint16(svcPort) && svc.Protocol == string(api.ProtocolTCP) {
|
||||
destinations, _ := ipvs.GetRealServers(svc)
|
||||
if len(destinations) != 1 {
|
||||
t.Errorf("Unexpected %d destinations, expect 0 destinations", len(destinations))
|
||||
}
|
||||
for _, ep := range destinations {
|
||||
if ep.Address.String() == epIP && ep.Port == uint16(svcPort) {
|
||||
found = true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
t.Errorf("Did not got expected loadbalance service")
|
||||
}
|
||||
epVS := &netlinktest.ExpectedVirtualServer{
|
||||
VSNum: 2, IP: svcLBIP, Port: uint16(svcPort), Protocol: string(api.ProtocolTCP),
|
||||
RS: []netlinktest.ExpectedRealServer{{
|
||||
IP: epIP, Port: uint16(svcPort),
|
||||
}}}
|
||||
checkIPVS(t, fp, epVS)
|
||||
|
||||
// Check ipset entry
|
||||
expectIPSet := map[string]*utilipset.Entry{
|
||||
KubeLoadBalancerSet: {
|
||||
epIPSet := netlinktest.ExpectedIPSet{
|
||||
KubeLoadBalancerSet: {{
|
||||
IP: svcLBIP,
|
||||
Port: svcPort,
|
||||
Protocol: strings.ToLower(string(api.ProtocolTCP)),
|
||||
SetType: utilipset.HashIPPort,
|
||||
},
|
||||
KubeLoadBalancerMasqSet: {
|
||||
IP: svcLBIP,
|
||||
Port: svcPort,
|
||||
Protocol: strings.ToLower(string(api.ProtocolTCP)),
|
||||
SetType: utilipset.HashIPPort,
|
||||
},
|
||||
KubeLoadBalancerSourceCIDRSet: {
|
||||
}},
|
||||
KubeLoadBalancerSourceCIDRSet: {{
|
||||
IP: svcLBIP,
|
||||
Port: svcPort,
|
||||
Protocol: strings.ToLower(string(api.ProtocolTCP)),
|
||||
Net: svcLBSource,
|
||||
SetType: utilipset.HashIPPortNet,
|
||||
},
|
||||
}
|
||||
for set, entry := range expectIPSet {
|
||||
ents, err := ipset.ListEntries(set)
|
||||
if err != nil || len(ents) != 1 {
|
||||
t.Errorf("Check ipset entries failed for ipset: %q", set)
|
||||
continue
|
||||
}
|
||||
if ents[0] != entry.String() {
|
||||
t.Errorf("Check ipset entries failed for ipset: %q", set)
|
||||
}
|
||||
}},
|
||||
}
|
||||
checkIPSet(t, fp, epIPSet)
|
||||
|
||||
// Check iptables chain and rules
|
||||
kubeSvcRules := ipt.GetRules(string(kubeServicesChain))
|
||||
kubeFWRules := ipt.GetRules(string(KubeFireWallChain))
|
||||
if !hasJump(kubeSvcRules, string(KubeMarkMasqChain), KubeLoadBalancerMasqSet) {
|
||||
t.Errorf("Didn't find jump from chain %v match set %v to MASQUERADE", kubeServicesChain, KubeLoadBalancerMasqSet)
|
||||
}
|
||||
if !hasJump(kubeSvcRules, string(KubeFireWallChain), KubeLoadBalancerSet) {
|
||||
t.Errorf("Didn't find jump from chain %v match set %v to %v", kubeServicesChain,
|
||||
KubeLoadBalancerSet, KubeFireWallChain)
|
||||
}
|
||||
if !hasJump(kubeFWRules, "ACCEPT", KubeLoadBalancerSourceCIDRSet) {
|
||||
t.Errorf("Didn't find jump from chain %v match set %v to ACCEPT", kubeServicesChain, KubeLoadBalancerSourceCIDRSet)
|
||||
epIpt := netlinktest.ExpectedIptablesChain{
|
||||
string(kubeServicesChain): {{
|
||||
JumpChain: string(KubeFireWallChain), MatchSet: KubeLoadBalancerSet,
|
||||
}},
|
||||
string(KubeFireWallChain): {{
|
||||
JumpChain: "ACCEPT", MatchSet: KubeLoadBalancerSourceCIDRSet,
|
||||
}, {
|
||||
JumpChain: string(KubeMarkDropChain), MatchSet: "",
|
||||
}},
|
||||
}
|
||||
checkIptables(t, ipt, epIpt)
|
||||
}
|
||||
|
||||
func TestOnlyLocalLoadBalancing(t *testing.T) {
|
||||
ipt := iptablestest.NewFake()
|
||||
ipvs := ipvstest.NewFake()
|
||||
ipset := ipsettest.NewFake(testIPSetVersion)
|
||||
fp := NewFakeProxier(ipt, ipvs, ipset, nil)
|
||||
ipt, fp := buildFakeProxier(nil)
|
||||
|
||||
svcIP := "10.20.30.41"
|
||||
svcPort := 80
|
||||
svcNodePort := 3001
|
||||
|
@ -1047,17 +1007,13 @@ func TestOnlyLocalLoadBalancing(t *testing.T) {
|
|||
}),
|
||||
)
|
||||
|
||||
epIP1 := "10.180.0.1"
|
||||
epIP2 := "10.180.2.1"
|
||||
epIP := "10.180.0.1"
|
||||
makeEndpointsMap(fp,
|
||||
makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *api.Endpoints) {
|
||||
ept.Subsets = []api.EndpointSubset{{
|
||||
Addresses: []api.EndpointAddress{{
|
||||
IP: epIP1,
|
||||
IP: epIP,
|
||||
NodeName: nil,
|
||||
}, {
|
||||
IP: epIP2,
|
||||
NodeName: strPtr(testHostname),
|
||||
}},
|
||||
Ports: []api.EndpointPort{{
|
||||
Name: svcPortName.Port,
|
||||
|
@ -1068,6 +1024,44 @@ func TestOnlyLocalLoadBalancing(t *testing.T) {
|
|||
)
|
||||
|
||||
fp.syncProxyRules()
|
||||
|
||||
// Expect 2 services and 1 destination
|
||||
epVS := &netlinktest.ExpectedVirtualServer{
|
||||
VSNum: 2, IP: svcLBIP, Port: uint16(svcPort), Protocol: string(api.ProtocolTCP),
|
||||
RS: []netlinktest.ExpectedRealServer{{
|
||||
IP: epIP, Port: uint16(svcPort),
|
||||
}}}
|
||||
checkIPVS(t, fp, epVS)
|
||||
|
||||
// check ipSet rules
|
||||
epIPSet := netlinktest.ExpectedIPSet{
|
||||
KubeLoadBalancerSet: {{
|
||||
IP: svcLBIP,
|
||||
Port: svcPort,
|
||||
Protocol: strings.ToLower(string(api.ProtocolTCP)),
|
||||
SetType: utilipset.HashIPPort,
|
||||
}},
|
||||
KubeLoadBalancerIngressLocalSet: {{
|
||||
IP: svcLBIP,
|
||||
Port: svcPort,
|
||||
Protocol: strings.ToLower(string(api.ProtocolTCP)),
|
||||
SetType: utilipset.HashIPPort,
|
||||
}},
|
||||
}
|
||||
checkIPSet(t, fp, epIPSet)
|
||||
|
||||
// Check iptables chain and rules
|
||||
epIpt := netlinktest.ExpectedIptablesChain{
|
||||
string(kubeServicesChain): {{
|
||||
JumpChain: string(KubeFireWallChain), MatchSet: KubeLoadBalancerSet,
|
||||
}},
|
||||
string(KubeFireWallChain): {{
|
||||
JumpChain: "ACCEPT", MatchSet: KubeLoadBalancerIngressLocalSet,
|
||||
}, {
|
||||
JumpChain: string(KubeMarkMasqChain), MatchSet: "",
|
||||
}},
|
||||
}
|
||||
checkIptables(t, ipt, epIpt)
|
||||
}
|
||||
|
||||
func addTestPort(array []api.ServicePort, name string, protocol api.Protocol, port, nodeport int32, targetPort int) []api.ServicePort {
|
||||
|
@ -2399,18 +2393,75 @@ func Test_syncService(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func buildFakeProxier(nodeIP []net.IP) (*iptablestest.FakeIPTables, *Proxier) {
|
||||
ipt := iptablestest.NewFake()
|
||||
ipvs := ipvstest.NewFake()
|
||||
ipset := ipsettest.NewFake(testIPSetVersion)
|
||||
return ipt, NewFakeProxier(ipt, ipvs, ipset, nil)
|
||||
}
|
||||
|
||||
func hasJump(rules []iptablestest.Rule, destChain, ipSet string) bool {
|
||||
match := false
|
||||
for _, r := range rules {
|
||||
if r[iptablestest.Jump] == destChain {
|
||||
match = true
|
||||
if ipSet != "" {
|
||||
if strings.Contains(r[iptablestest.MatchSet], ipSet) {
|
||||
return true
|
||||
}
|
||||
match = false
|
||||
if ipSet == "" {
|
||||
return true
|
||||
}
|
||||
if strings.Contains(r[iptablestest.MatchSet], ipSet) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// checkIptabless to check expected iptables chain and rules
|
||||
func checkIptables(t *testing.T, ipt *iptablestest.FakeIPTables, epIpt netlinktest.ExpectedIptablesChain) {
|
||||
for epChain, epRules := range epIpt {
|
||||
rules := ipt.GetRules(epChain)
|
||||
for _, epRule := range epRules {
|
||||
if !hasJump(rules, epRule.JumpChain, epRule.MatchSet) {
|
||||
t.Errorf("Didn't find jump from chain %v match set %v to %v", epChain, epRule.MatchSet, epRule.JumpChain)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// checkIPSet to check expected ipset and entries
|
||||
func checkIPSet(t *testing.T, fp *Proxier, ipSet netlinktest.ExpectedIPSet) {
|
||||
for set, entries := range ipSet {
|
||||
ents, err := fp.ipset.ListEntries(set)
|
||||
if err != nil || len(ents) != len(entries) {
|
||||
t.Errorf("Check ipset entries failed for ipset: %q", set)
|
||||
continue
|
||||
}
|
||||
if len(entries) == 1 {
|
||||
if ents[0] != entries[0].String() {
|
||||
t.Errorf("Check ipset entries failed for ipset: %q", set)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// checkIPVS to check expected ipvs service and destination
|
||||
func checkIPVS(t *testing.T, fp *Proxier, vs *netlinktest.ExpectedVirtualServer) {
|
||||
services, err := fp.ipvs.GetVirtualServers()
|
||||
if err != nil {
|
||||
t.Errorf("Failed to get ipvs services, err: %v", err)
|
||||
}
|
||||
if len(services) != vs.VSNum {
|
||||
t.Errorf("Expect %d ipvs services, got %d", vs.VSNum, len(services))
|
||||
}
|
||||
for _, svc := range services {
|
||||
if svc.Address.String() == vs.IP && svc.Port == vs.Port && svc.Protocol == vs.Protocol {
|
||||
destinations, _ := fp.ipvs.GetRealServers(svc)
|
||||
if len(destinations) != len(vs.RS) {
|
||||
t.Errorf("Expected %d destinations, got %d destinations", len(vs.RS), len(destinations))
|
||||
}
|
||||
if len(vs.RS) == 1 {
|
||||
if destinations[0].Address.String() != vs.RS[0].IP || destinations[0].Port != vs.RS[0].Port {
|
||||
t.Errorf("Unexpected mismatch destinations")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return match
|
||||
}
|
||||
|
|
|
@ -10,10 +10,16 @@ load(
|
|||
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = ["fake.go"],
|
||||
srcs = [
|
||||
"fake.go",
|
||||
"util.go",
|
||||
],
|
||||
importpath = "k8s.io/kubernetes/pkg/proxy/ipvs/testing",
|
||||
tags = ["automanaged"],
|
||||
deps = ["//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library"],
|
||||
deps = [
|
||||
"//pkg/util/ipset:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
|
|
|
@ -0,0 +1,51 @@
|
|||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package testing
|
||||
|
||||
import (
|
||||
utilipset "k8s.io/kubernetes/pkg/util/ipset"
|
||||
)
|
||||
|
||||
// ExpectedVirtualServer is the expected ipvs rules with VirtualServer and RealServer
|
||||
// VSNum is the expected ipvs virtual server number
|
||||
// IP:Port protocol is the expected ipvs vs info
|
||||
// RS is the RealServer of this expected VirtualServer
|
||||
type ExpectedVirtualServer struct {
|
||||
VSNum int
|
||||
IP string
|
||||
Port uint16
|
||||
Protocol string
|
||||
RS []ExpectedRealServer
|
||||
}
|
||||
|
||||
// ExpectedRealServer is the expected ipvs RealServer
|
||||
type ExpectedRealServer struct {
|
||||
IP string
|
||||
Port uint16
|
||||
}
|
||||
|
||||
// ExpectedIptablesChain is a map of expected iptables chain and jump rules
|
||||
type ExpectedIptablesChain map[string][]ExpectedIptablesRule
|
||||
|
||||
// ExpectedIptablesRule is the expected iptables rules with jump chain and match ipset name
|
||||
type ExpectedIptablesRule struct {
|
||||
JumpChain string
|
||||
MatchSet string
|
||||
}
|
||||
|
||||
// ExpectedIPSet is the expected ipset with set name and entries name
|
||||
type ExpectedIPSet map[string][]*utilipset.Entry
|
Loading…
Reference in New Issue