fix ipvs esipp

pull/8/head
Lion-Wei 2018-03-13 16:10:26 +08:00
parent 3538676841
commit 6762a865db
3 changed files with 224 additions and 150 deletions

View File

@ -40,8 +40,8 @@ const (
// KubeLoadBalancerSet is used to store service load balancer ingress ip + port, it is the service lb portal.
KubeLoadBalancerSet = "KUBE-LOAD-BALANCER"
// KubeLoadBalancerMasqSet is used to store service load balancer ingress ip + port for masquerade purpose.
KubeLoadBalancerMasqSet = "KUBE-LOAD-BALANCER-MASQ"
// KubeLoadBalancerIngressLocalSet is used to store service load balancer ingress ip + port with externalTrafficPolicy=local.
KubeLoadBalancerIngressLocalSet = "KUBE-LB-INGRESS-LOCAL"
// KubeLoadBalancerSourceIPSet is used to store service load balancer ingress ip + port + source IP for packet filter purpose.
KubeLoadBalancerSourceIPSet = "KUBE-LOAD-BALANCER-SOURCE-IP"
@ -49,11 +49,17 @@ const (
// KubeLoadBalancerSourceCIDRSet is used to store service load balancer ingress ip + port + source cidr for packet filter purpose.
KubeLoadBalancerSourceCIDRSet = "KUBE-LOAD-BALANCER-SOURCE-CIDR"
// KubeNodePortSetTCP is used to store nodeport TCP port for masquerade purpose.
// KubeNodePortSetTCP is used to store the nodeport TCP port for masquerade purpose.
KubeNodePortSetTCP = "KUBE-NODE-PORT-TCP"
// KubeNodePortSetUDP is used to store nodeport UDP port for masquerade purpose.
// KubeNodePortLocalSetTCP is used to store the nodeport TCP port with externalTrafficPolicy=local.
KubeNodePortLocalSetTCP = "KUBE-NODE-PORT-LOCAL-TCP"
// KubeNodePortSetUDP is used to store the nodeport UDP port for masquerade purpose.
KubeNodePortSetUDP = "KUBE-NODE-PORT-UDP"
// KubeNodePortLocalSetUDP is used to store the nodeport UDP port with externalTrafficPolicy=local.
KubeNodePortLocalSetUDP = "KUBE-NODE-PORT-LOCAL-UDP"
)
// IPSetVersioner can query the current ipset version.

View File

@ -68,6 +68,9 @@ const (
// KubeMarkMasqChain is the mark-for-masquerade chain
KubeMarkMasqChain utiliptables.Chain = "KUBE-MARK-MASQ"
// KubeNodePortChain is the kubernetes node port chain
KubeNodePortChain utiliptables.Chain = "KUBE-NODE-PORT"
// KubeMarkDropChain is the mark-for-drop chain
KubeMarkDropChain utiliptables.Chain = "KUBE-MARK-DROP"
@ -157,12 +160,16 @@ type Proxier struct {
nodePortSetTCP *IPSet
// nodePortSetTCP is the bitmap:port type ipset where stores all UDP node port
nodePortSetUDP *IPSet
// lbIngressLocalSet is the hash:ip type ipset where stores all service ip's with externaltrafficPolicy=local
lbIngressLocalSet *IPSet
// nodePortLocalSetTCP is the bitmap:port type ipset where stores all TCP nodeport's with externaltrafficPolicy=local
nodePortLocalSetTCP *IPSet
// nodePortLocalSetUDP is the bitmap:port type ipset where stores all UDP nodeport's with externaltrafficPolicy=local
nodePortLocalSetUDP *IPSet
// externalIPSet is the hash:ip,port type ipset where stores all service ExternalIP:Port
externalIPSet *IPSet
// lbIngressSet is the hash:ip,port type ipset where stores all service load balancer ingress IP:Port.
lbIngressSet *IPSet
// lbMasqSet is the hash:ip,port type ipset where stores all service load balancer ingress IP:Port which needs masquerade.
lbMasqSet *IPSet
// lbWhiteListIPSet is the hash:ip,port,ip type ipset where stores all service load balancer ingress IP:Port,sourceIP pair, any packets
// with the source IP visit ingress IP:Port can pass through.
lbWhiteListIPSet *IPSet
@ -308,43 +315,45 @@ func NewProxier(ipt utiliptables.Interface,
healthChecker := healthcheck.NewServer(hostname, recorder, nil, nil) // use default implementations of deps
proxier := &Proxier{
portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable),
serviceMap: make(proxy.ServiceMap),
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, &isIPv6, recorder),
endpointsMap: make(proxy.EndpointsMap),
endpointsChanges: proxy.NewEndpointChangeTracker(hostname, nil, &isIPv6, recorder),
syncPeriod: syncPeriod,
minSyncPeriod: minSyncPeriod,
iptables: ipt,
masqueradeAll: masqueradeAll,
masqueradeMark: masqueradeMark,
exec: exec,
clusterCIDR: clusterCIDR,
hostname: hostname,
nodeIP: nodeIP,
portMapper: &listenPortOpener{},
recorder: recorder,
healthChecker: healthChecker,
healthzServer: healthzServer,
ipvs: ipvs,
ipvsScheduler: scheduler,
ipGetter: &realIPGetter{nl: NewNetLinkHandle()},
iptablesData: bytes.NewBuffer(nil),
natChains: bytes.NewBuffer(nil),
natRules: bytes.NewBuffer(nil),
netlinkHandle: NewNetLinkHandle(),
ipset: ipset,
loopbackSet: NewIPSet(ipset, KubeLoopBackIPSet, utilipset.HashIPPortIP, isIPv6),
clusterIPSet: NewIPSet(ipset, KubeClusterIPSet, utilipset.HashIPPort, isIPv6),
externalIPSet: NewIPSet(ipset, KubeExternalIPSet, utilipset.HashIPPort, isIPv6),
lbIngressSet: NewIPSet(ipset, KubeLoadBalancerSet, utilipset.HashIPPort, isIPv6),
lbMasqSet: NewIPSet(ipset, KubeLoadBalancerMasqSet, utilipset.HashIPPort, isIPv6),
lbWhiteListIPSet: NewIPSet(ipset, KubeLoadBalancerSourceIPSet, utilipset.HashIPPortIP, isIPv6),
lbWhiteListCIDRSet: NewIPSet(ipset, KubeLoadBalancerSourceCIDRSet, utilipset.HashIPPortNet, isIPv6),
nodePortSetTCP: NewIPSet(ipset, KubeNodePortSetTCP, utilipset.BitmapPort, false),
nodePortSetUDP: NewIPSet(ipset, KubeNodePortSetUDP, utilipset.BitmapPort, false),
nodePortAddresses: nodePortAddresses,
networkInterfacer: utilproxy.RealNetwork{},
portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable),
serviceMap: make(proxy.ServiceMap),
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, &isIPv6, recorder),
endpointsMap: make(proxy.EndpointsMap),
endpointsChanges: proxy.NewEndpointChangeTracker(hostname, nil, &isIPv6, recorder),
syncPeriod: syncPeriod,
minSyncPeriod: minSyncPeriod,
iptables: ipt,
masqueradeAll: masqueradeAll,
masqueradeMark: masqueradeMark,
exec: exec,
clusterCIDR: clusterCIDR,
hostname: hostname,
nodeIP: nodeIP,
portMapper: &listenPortOpener{},
recorder: recorder,
healthChecker: healthChecker,
healthzServer: healthzServer,
ipvs: ipvs,
ipvsScheduler: scheduler,
ipGetter: &realIPGetter{nl: NewNetLinkHandle()},
iptablesData: bytes.NewBuffer(nil),
natChains: bytes.NewBuffer(nil),
natRules: bytes.NewBuffer(nil),
netlinkHandle: NewNetLinkHandle(),
ipset: ipset,
loopbackSet: NewIPSet(ipset, KubeLoopBackIPSet, utilipset.HashIPPortIP, isIPv6),
clusterIPSet: NewIPSet(ipset, KubeClusterIPSet, utilipset.HashIPPort, isIPv6),
externalIPSet: NewIPSet(ipset, KubeExternalIPSet, utilipset.HashIPPort, isIPv6),
lbIngressSet: NewIPSet(ipset, KubeLoadBalancerSet, utilipset.HashIPPort, isIPv6),
lbIngressLocalSet: NewIPSet(ipset, KubeLoadBalancerIngressLocalSet, utilipset.HashIPPort, isIPv6),
lbWhiteListIPSet: NewIPSet(ipset, KubeLoadBalancerSourceIPSet, utilipset.HashIPPortIP, isIPv6),
lbWhiteListCIDRSet: NewIPSet(ipset, KubeLoadBalancerSourceCIDRSet, utilipset.HashIPPortNet, isIPv6),
nodePortSetTCP: NewIPSet(ipset, KubeNodePortSetTCP, utilipset.BitmapPort, false),
nodePortLocalSetTCP: NewIPSet(ipset, KubeNodePortLocalSetTCP, utilipset.BitmapPort, false),
nodePortSetUDP: NewIPSet(ipset, KubeNodePortSetUDP, utilipset.BitmapPort, false),
nodePortLocalSetUDP: NewIPSet(ipset, KubeNodePortLocalSetUDP, utilipset.BitmapPort, false),
nodePortAddresses: nodePortAddresses,
networkInterfacer: utilproxy.RealNetwork{},
}
burstSyncs := 2
glog.V(3).Infof("minSyncPeriod: %v, syncPeriod: %v, burstSyncs: %d", minSyncPeriod, syncPeriod, burstSyncs)
@ -511,7 +520,8 @@ func CleanupLeftovers(ipvs utilipvs.Interface, ipt utiliptables.Interface, ipset
// Destroy ip sets created by ipvs Proxier. We should call it after cleaning up
// iptables since we can NOT delete ip set which is still referenced by iptables.
ipSetsToDestroy := []string{KubeLoopBackIPSet, KubeClusterIPSet, KubeLoadBalancerSet, KubeNodePortSetTCP, KubeNodePortSetUDP,
KubeExternalIPSet, KubeLoadBalancerSourceIPSet, KubeLoadBalancerSourceCIDRSet, KubeLoadBalancerMasqSet}
KubeExternalIPSet, KubeLoadBalancerSourceIPSet, KubeLoadBalancerSourceCIDRSet,
KubeLoadBalancerIngressLocalSet, KubeNodePortLocalSetUDP, KubeNodePortLocalSetTCP}
for _, set := range ipSetsToDestroy {
err = ipset.DestroySet(set)
if err != nil {
@ -699,7 +709,8 @@ func (proxier *Proxier) syncProxyRules() {
// make sure ip sets exists in the system.
ipSets := []*IPSet{proxier.loopbackSet, proxier.clusterIPSet, proxier.externalIPSet, proxier.nodePortSetUDP, proxier.nodePortSetTCP,
proxier.lbIngressSet, proxier.lbMasqSet, proxier.lbWhiteListCIDRSet, proxier.lbWhiteListIPSet}
proxier.lbIngressSet, proxier.lbWhiteListCIDRSet, proxier.lbWhiteListIPSet, proxier.lbIngressLocalSet,
proxier.nodePortLocalSetTCP, proxier.nodePortLocalSetUDP}
if err := ensureIPSets(ipSets...); err != nil {
return
}
@ -731,12 +742,18 @@ func (proxier *Proxier) syncProxyRules() {
glog.Errorf("Failed to link KUBE-SERVICES chain: %v", err)
return
}
// Kube service ipset
if err := proxier.createKubeFireWallChain(existingNATChains, proxier.natChains); err != nil {
// `iptables -t nat -N KUBE-FIRE-WALL`
if err := proxier.createKubeChain(existingNATChains, KubeFireWallChain); err != nil {
glog.Errorf("Failed to create KUBE-FIRE-WALL chain: %v", err)
return
}
// `iptables -t nat -N KUBE-NODE-PORT`
if err := proxier.createKubeChain(existingNATChains, KubeNodePortChain); err != nil {
glog.Errorf("Failed to create KUBE-NODE-PORT chain: %v", err)
return
}
// Build IPVS rules for each service.
for svcName, svc := range proxier.serviceMap {
svcInfo, ok := svc.(*serviceInfo)
@ -901,23 +918,23 @@ func (proxier *Proxier) syncProxyRules() {
// proxier.kubeServiceAccessSet.activeEntries.Insert(entry.String())
// If we are proxying globally, we need to masquerade in case we cross nodes.
// If we are proxying only locally, we can retain the source IP.
if !svcInfo.OnlyNodeLocalEndpoints {
if valid := proxier.lbMasqSet.validateEntry(entry); !valid {
glog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.lbMasqSet.Name))
if valid := proxier.lbIngressSet.validateEntry(entry); !valid {
glog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.lbIngressSet.Name))
continue
}
proxier.lbIngressSet.activeEntries.Insert(entry.String())
// insert loadbalancer entry to lbIngressLocalSet if service externaltrafficpolicy=local
if svcInfo.OnlyNodeLocalEndpoints {
if valid := proxier.lbIngressLocalSet.validateEntry(entry); !valid {
glog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.lbIngressSet.Name))
continue
}
proxier.lbMasqSet.activeEntries.Insert(entry.String())
proxier.lbIngressLocalSet.activeEntries.Insert(entry.String())
}
if len(svcInfo.LoadBalancerSourceRanges) != 0 {
// The service firewall rules are created based on ServiceSpec.loadBalancerSourceRanges field.
// This currently works for loadbalancers that preserves source ips.
// For loadbalancers which direct traffic to service NodePort, the firewall rules will not apply.
if valid := proxier.lbIngressSet.validateEntry(entry); !valid {
glog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.lbIngressSet.Name))
continue
}
proxier.lbIngressSet.activeEntries.Insert(entry.String())
allowFromNode := false
for _, src := range svcInfo.LoadBalancerSourceRanges {
// ipset call
@ -1009,30 +1026,48 @@ func (proxier *Proxier) syncProxyRules() {
// Nodeports need SNAT, unless they're local.
// ipset call
if !svcInfo.OnlyNodeLocalEndpoints {
entry = &utilipset.Entry{
// No need to provide ip info
Port: svcInfo.NodePort,
Protocol: protocol,
SetType: utilipset.BitmapPort,
entry = &utilipset.Entry{
// No need to provide ip info
Port: svcInfo.NodePort,
Protocol: protocol,
SetType: utilipset.BitmapPort,
}
var nodePortSet *IPSet
switch protocol {
case "tcp":
nodePortSet = proxier.nodePortSetTCP
case "udp":
nodePortSet = proxier.nodePortSetUDP
default:
// It should never hit
glog.Errorf("Unsupported protocol type: %s", protocol)
}
if nodePortSet != nil {
if valid := nodePortSet.validateEntry(entry); !valid {
glog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, nodePortSet.Name))
continue
}
var nodePortSet *IPSet
switch protocol {
nodePortSet.activeEntries.Insert(entry.String())
}
// Add externaltrafficpolicy=local type nodeport entry
if svcInfo.OnlyNodeLocalEndpoints {
var nodePortLocalSet *IPSet
switch protocol {
case "tcp":
nodePortSet = proxier.nodePortSetTCP
nodePortLocalSet = proxier.nodePortLocalSetTCP
case "udp":
nodePortSet = proxier.nodePortSetUDP
nodePortLocalSet = proxier.nodePortLocalSetUDP
default:
// It should never hit
glog.Errorf("Unsupported protocol type: %s", protocol)
}
if nodePortSet != nil {
if valid := nodePortSet.validateEntry(entry); !valid {
glog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, nodePortSet.Name))
if nodePortLocalSet != nil {
if valid := nodePortLocalSet.validateEntry(entry); !valid {
glog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, nodePortLocalSet.Name))
continue
}
nodePortSet.activeEntries.Insert(entry.String())
nodePortLocalSet.activeEntries.Insert(entry.String())
}
}
@ -1080,8 +1115,9 @@ func (proxier *Proxier) syncProxyRules() {
}
// sync ipset entries
ipsetsToSync := []*IPSet{proxier.loopbackSet, proxier.clusterIPSet, proxier.lbIngressSet, proxier.lbMasqSet, proxier.nodePortSetTCP,
proxier.nodePortSetUDP, proxier.externalIPSet, proxier.lbWhiteListIPSet, proxier.lbWhiteListCIDRSet}
ipsetsToSync := []*IPSet{proxier.loopbackSet, proxier.clusterIPSet, proxier.lbIngressSet, proxier.nodePortSetTCP,
proxier.nodePortSetUDP, proxier.externalIPSet, proxier.lbWhiteListIPSet, proxier.lbWhiteListCIDRSet, proxier.lbIngressLocalSet,
proxier.nodePortLocalSetTCP, proxier.nodePortLocalSetUDP}
for i := range ipsetsToSync {
ipsetsToSync[i].syncIPSetEntries()
}
@ -1134,46 +1170,55 @@ func (proxier *Proxier) syncProxyRules() {
// This covers cases like GCE load-balancers which get added to the local routing table.
writeLine(proxier.natRules, append(dstLocalOnlyArgs, "-j", "ACCEPT")...)
}
if !proxier.lbMasqSet.isEmpty() {
if !proxier.lbIngressSet.isEmpty() {
// Build masquerade rules for packets which cross node visit load balancer ingress IPs.
args = append(args[:0],
"-A", string(kubeServicesChain),
"-m", "set", "--match-set", proxier.lbMasqSet.Name,
"dst,dst",
)
writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
}
if !proxier.lbWhiteListCIDRSet.isEmpty() || !proxier.lbWhiteListIPSet.isEmpty() {
// link kube-services chain -> kube-fire-wall chain
args := []string{
"-A", string(kubeServicesChain),
"-m", "set", "--match-set", proxier.lbIngressSet.Name,
"dst,dst",
"-j", string(KubeFireWallChain),
}
writeLine(proxier.natRules, args...)
if !proxier.lbWhiteListCIDRSet.isEmpty() {
)
writeLine(proxier.natRules, append(args, "-j", string(KubeFireWallChain))...)
// Don't masq for service with externaltrafficpolicy =local
if !proxier.lbIngressLocalSet.isEmpty() {
args = append(args[:0],
"-A", string(KubeFireWallChain),
"-m", "set", "--match-set", proxier.lbWhiteListCIDRSet.Name,
"dst,dst,src",
)
writeLine(proxier.natRules, append(args, "-j", "ACCEPT")...)
}
if !proxier.lbWhiteListIPSet.isEmpty() {
args = append(args[:0],
"-A", string(KubeFireWallChain),
"-m", "set", "--match-set", proxier.lbWhiteListIPSet.Name,
"dst,dst,src",
"-m", "set", "--match-set", proxier.lbIngressLocalSet.Name,
"dst,dst",
)
writeLine(proxier.natRules, append(args, "-j", "ACCEPT")...)
}
// mark masq for others
args = append(args[:0],
"-A", string(KubeFireWallChain),
"-m", "comment", "--comment",
fmt.Sprintf(`"mark MASQ for external traffic policy not local"`),
)
// If the packet was able to reach the end of firewall chain, then it did not get DNATed.
// It means the packet cannot go thru the firewall, then mark it for DROP
writeLine(proxier.natRules, append(args, "-j", string(KubeMarkDropChain))...)
writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
// if have whitelist, accept or drop.
if !proxier.lbWhiteListCIDRSet.isEmpty() || !proxier.lbWhiteListIPSet.isEmpty() {
if !proxier.lbWhiteListCIDRSet.isEmpty() {
args = append(args[:0],
"-A", string(KubeFireWallChain),
"-m", "set", "--match-set", proxier.lbWhiteListCIDRSet.Name,
"dst,dst,src",
)
writeLine(proxier.natRules, append(args, "-j", "ACCEPT")...)
}
if !proxier.lbWhiteListIPSet.isEmpty() {
args = append(args[:0],
"-A", string(KubeFireWallChain),
"-m", "set", "--match-set", proxier.lbWhiteListIPSet.Name,
"dst,dst,src",
)
writeLine(proxier.natRules, append(args, "-j", "ACCEPT")...)
}
args = append(args[:0],
"-A", string(KubeFireWallChain),
)
// If the packet was able to reach the end of firewall chain, then it did not get DNATed.
// It means the packet cannot go thru the firewall, then mark it for DROP
writeLine(proxier.natRules, append(args, "-j", string(KubeMarkDropChain))...)
}
}
if !proxier.nodePortSetTCP.isEmpty() {
// Build masquerade rules for packets which cross node visit nodeport.
@ -1183,15 +1228,47 @@ func (proxier *Proxier) syncProxyRules() {
"-m", "set", "--match-set", proxier.nodePortSetTCP.Name,
"dst",
)
writeLine(proxier.natRules, append(args, "-j", string(KubeNodePortChain))...)
// accept for nodeports w/ externaltrafficpolicy=local
if !proxier.nodePortLocalSetTCP.isEmpty() {
args = append(args[:0],
"-A", string(KubeNodePortChain),
"-m", "set", "--match-set", proxier.nodePortLocalSetTCP.Name,
"dst",
)
writeLine(proxier.natRules, append(args, "-j", "ACCEPT")...)
}
// mark masq for others
args = append(args[:0],
"-A", string(KubeNodePortChain),
"-m", "comment", "--comment",
fmt.Sprintf(`"mark MASQ for externaltrafficpolicy=cluster"`),
)
writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
}
if !proxier.nodePortSetUDP.isEmpty() {
// accept for nodeports w/ externaltrafficpolicy=local
args = append(args[:0],
"-A", string(kubeServicesChain),
"-m", "udp", "-p", "udp",
"-m", "set", "--match-set", proxier.nodePortSetUDP.Name,
"dst",
)
writeLine(proxier.natRules, append(args, "-j", string(KubeNodePortChain))...)
if !proxier.nodePortLocalSetUDP.isEmpty() {
args = append(args[:0],
"-A", string(KubeNodePortChain),
"-m", "set", "--match-set", proxier.nodePortLocalSetUDP.Name,
"dst",
)
writeLine(proxier.natRules, append(args, "-j", "ACCEPT")...)
}
// mark masq for others
args = append(args[:0],
"-A", string(KubeNodePortChain),
"-m", "comment", "--comment",
fmt.Sprintf(`"mark MASQ for externaltrafficpolicy=cluster"`),
)
writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
}
@ -1327,9 +1404,7 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode
}
for _, epInfo := range proxier.endpointsMap[svcPortName] {
if !onlyNodeLocalEndpoints || onlyNodeLocalEndpoints && epInfo.GetIsLocal() {
newEndpoints.Insert(epInfo.String())
}
newEndpoints.Insert(epInfo.String())
}
if !curEndpoints.Equal(newEndpoints) {
@ -1439,17 +1514,17 @@ func (proxier *Proxier) linkKubeServiceChain(existingNATChains map[utiliptables.
return nil
}
func (proxier *Proxier) createKubeFireWallChain(existingNATChains map[utiliptables.Chain]string, natChains *bytes.Buffer) error {
// `iptables -t nat -N KUBE-FIRE-WALL`
if _, err := proxier.iptables.EnsureChain(utiliptables.TableNAT, KubeFireWallChain); err != nil {
return fmt.Errorf("Failed to ensure that %s chain %s exists: %v", utiliptables.TableNAT, KubeFireWallChain, err)
// `iptables -t nat -N <chainName>`
func (proxier *Proxier) createKubeChain(existingNATChains map[utiliptables.Chain]string, chainName utiliptables.Chain) error {
if _, err := proxier.iptables.EnsureChain(utiliptables.TableNAT, chainName); err != nil {
return fmt.Errorf("Failed to ensure that %s chain %s exists: %v", utiliptables.TableNAT, chainName, err)
}
// write `:KUBE-FIRE-WALL - [0:0]` in nat table
if chain, ok := existingNATChains[KubeFireWallChain]; ok {
writeLine(natChains, chain)
// write `:<chainName> - [0:0]` in nat table
if chain, ok := existingNATChains[chainName]; ok {
writeLine(proxier.natChains, chain)
} else {
writeLine(natChains, utiliptables.MakeChainLine(KubeFireWallChain))
writeLine(proxier.natChains, utiliptables.MakeChainLine(chainName))
}
return nil
}

View File

@ -120,36 +120,38 @@ func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset u
LookPathFunc: func(cmd string) (string, error) { return cmd, nil },
}
return &Proxier{
exec: fexec,
serviceMap: make(proxy.ServiceMap),
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, nil, nil),
endpointsMap: make(proxy.EndpointsMap),
endpointsChanges: proxy.NewEndpointChangeTracker(testHostname, nil, nil, nil),
iptables: ipt,
ipvs: ipvs,
ipset: ipset,
clusterCIDR: "10.0.0.0/24",
hostname: testHostname,
portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable),
portMapper: &fakePortOpener{[]*utilproxy.LocalPort{}},
healthChecker: newFakeHealthChecker(),
ipvsScheduler: DefaultScheduler,
ipGetter: &fakeIPGetter{nodeIPs: nodeIPs},
iptablesData: bytes.NewBuffer(nil),
natChains: bytes.NewBuffer(nil),
natRules: bytes.NewBuffer(nil),
netlinkHandle: netlinktest.NewFakeNetlinkHandle(),
loopbackSet: NewIPSet(ipset, KubeLoopBackIPSet, utilipset.HashIPPortIP, false),
clusterIPSet: NewIPSet(ipset, KubeClusterIPSet, utilipset.HashIPPort, false),
externalIPSet: NewIPSet(ipset, KubeExternalIPSet, utilipset.HashIPPort, false),
lbIngressSet: NewIPSet(ipset, KubeLoadBalancerSet, utilipset.HashIPPort, false),
lbMasqSet: NewIPSet(ipset, KubeLoadBalancerMasqSet, utilipset.HashIPPort, false),
lbWhiteListIPSet: NewIPSet(ipset, KubeLoadBalancerSourceIPSet, utilipset.HashIPPortIP, false),
lbWhiteListCIDRSet: NewIPSet(ipset, KubeLoadBalancerSourceCIDRSet, utilipset.HashIPPortNet, false),
nodePortSetTCP: NewIPSet(ipset, KubeNodePortSetTCP, utilipset.BitmapPort, false),
nodePortSetUDP: NewIPSet(ipset, KubeNodePortSetUDP, utilipset.BitmapPort, false),
nodePortAddresses: make([]string, 0),
networkInterfacer: proxyutiltest.NewFakeNetwork(),
exec: fexec,
serviceMap: make(proxy.ServiceMap),
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, nil, nil),
endpointsMap: make(proxy.EndpointsMap),
endpointsChanges: proxy.NewEndpointChangeTracker(testHostname, nil, nil, nil),
iptables: ipt,
ipvs: ipvs,
ipset: ipset,
clusterCIDR: "10.0.0.0/24",
hostname: testHostname,
portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable),
portMapper: &fakePortOpener{[]*utilproxy.LocalPort{}},
healthChecker: newFakeHealthChecker(),
ipvsScheduler: DefaultScheduler,
ipGetter: &fakeIPGetter{nodeIPs: nodeIPs},
iptablesData: bytes.NewBuffer(nil),
natChains: bytes.NewBuffer(nil),
natRules: bytes.NewBuffer(nil),
netlinkHandle: netlinktest.NewFakeNetlinkHandle(),
loopbackSet: NewIPSet(ipset, KubeLoopBackIPSet, utilipset.HashIPPortIP, false),
clusterIPSet: NewIPSet(ipset, KubeClusterIPSet, utilipset.HashIPPort, false),
externalIPSet: NewIPSet(ipset, KubeExternalIPSet, utilipset.HashIPPort, false),
lbIngressSet: NewIPSet(ipset, KubeLoadBalancerSet, utilipset.HashIPPort, false),
lbIngressLocalSet: NewIPSet(ipset, KubeLoadBalancerIngressLocalSet, utilipset.HashIPPort, false),
lbWhiteListIPSet: NewIPSet(ipset, KubeLoadBalancerSourceIPSet, utilipset.HashIPPortIP, false),
lbWhiteListCIDRSet: NewIPSet(ipset, KubeLoadBalancerSourceCIDRSet, utilipset.HashIPPortNet, false),
nodePortSetTCP: NewIPSet(ipset, KubeNodePortSetTCP, utilipset.BitmapPort, false),
nodePortLocalSetTCP: NewIPSet(ipset, KubeNodePortLocalSetTCP, utilipset.BitmapPort, false),
nodePortLocalSetUDP: NewIPSet(ipset, KubeNodePortLocalSetUDP, utilipset.BitmapPort, false),
nodePortSetUDP: NewIPSet(ipset, KubeNodePortSetUDP, utilipset.BitmapPort, false),
nodePortAddresses: make([]string, 0),
networkInterfacer: proxyutiltest.NewFakeNetwork(),
}
}
@ -976,12 +978,6 @@ func TestLoadBalanceSourceRanges(t *testing.T) {
Protocol: strings.ToLower(string(api.ProtocolTCP)),
SetType: utilipset.HashIPPort,
},
KubeLoadBalancerMasqSet: {
IP: svcLBIP,
Port: svcPort,
Protocol: strings.ToLower(string(api.ProtocolTCP)),
SetType: utilipset.HashIPPort,
},
KubeLoadBalancerSourceCIDRSet: {
IP: svcLBIP,
Port: svcPort,
@ -1004,9 +1000,6 @@ func TestLoadBalanceSourceRanges(t *testing.T) {
// Check iptables chain and rules
kubeSvcRules := ipt.GetRules(string(kubeServicesChain))
kubeFWRules := ipt.GetRules(string(KubeFireWallChain))
if !hasJump(kubeSvcRules, string(KubeMarkMasqChain), KubeLoadBalancerMasqSet) {
t.Errorf("Didn't find jump from chain %v match set %v to MASQUERADE", kubeServicesChain, KubeLoadBalancerMasqSet)
}
if !hasJump(kubeSvcRules, string(KubeFireWallChain), KubeLoadBalancerSet) {
t.Errorf("Didn't find jump from chain %v match set %v to %v", kubeServicesChain,
KubeLoadBalancerSet, KubeFireWallChain)