fix ipvs fw

pull/8/head
Lion-Wei 2018-04-17 19:32:32 +08:00
parent 28a19562bd
commit e90de22a5e
3 changed files with 99 additions and 61 deletions

View File

@ -40,8 +40,11 @@ const (
// KubeLoadBalancerSet is used to store service load balancer ingress ip + port, it is the service lb portal.
KubeLoadBalancerSet = "KUBE-LOAD-BALANCER"
// KubeLoadBalancerIngressLocalSet is used to store service load balancer ingress ip + port with externalTrafficPolicy=local.
KubeLoadBalancerIngressLocalSet = "KUBE-LB-INGRESS-LOCAL"
// KubeLoadBalancerLocalSet is used to store service load balancer ingress ip + port with externalTrafficPolicy=local.
KubeLoadBalancerLocalSet = "KUBE-LOAD-BALANCER-LOCAL"
// KubeLoadbalancerFWSet is used to store service load balancer ingress ip + port for load balancer with sourceRange.
KubeLoadbalancerFWSet = "KUBE-LOAD-BALANCER-FW"
// KubeLoadBalancerSourceIPSet is used to store service load balancer ingress ip + port + source IP for packet filter purpose.
KubeLoadBalancerSourceIPSet = "KUBE-LOAD-BALANCER-SOURCE-IP"

View File

@ -57,7 +57,7 @@ const (
kubeServicesChain utiliptables.Chain = "KUBE-SERVICES"
// KubeFireWallChain is the kubernetes firewall chain.
KubeFireWallChain utiliptables.Chain = "KUBE-FIRE-WALL"
KubeFireWallChain utiliptables.Chain = "KUBE-FIREWALL"
// kubePostroutingChain is the kubernetes postrouting chain
kubePostroutingChain utiliptables.Chain = "KUBE-POSTROUTING"
@ -74,6 +74,9 @@ const (
// KubeForwardChain is the kubernetes forward chain
KubeForwardChain utiliptables.Chain = "KUBE-FORWARD"
// KubeLoadBalancerChain is the kubernetes chain for loadbalancer type service
KubeLoadBalancerChain utiliptables.Chain = "KUBE-LOAD-BALANCER"
// DefaultScheduler is the default ipvs scheduler algorithm - round robin.
DefaultScheduler = "rr"
@ -164,16 +167,18 @@ type Proxier struct {
nodePortSetTCP *IPSet
// nodePortSetTCP is the bitmap:port type ipset which stores all UDP node port
nodePortSetUDP *IPSet
// lbIngressLocalSet is the hash:ip type ipset which stores all service ip's with externaltrafficPolicy=local
lbIngressLocalSet *IPSet
// nodePortLocalSetTCP is the bitmap:port type ipset which stores all TCP nodeport's with externaltrafficPolicy=local
nodePortLocalSetTCP *IPSet
// nodePortLocalSetUDP is the bitmap:port type ipset which stores all UDP nodeport's with externaltrafficPolicy=local
nodePortLocalSetUDP *IPSet
// externalIPSet is the hash:ip,port type ipset which stores all service ExternalIP:Port
externalIPSet *IPSet
// lbIngressSet is the hash:ip,port type ipset which stores all service load balancer ingress IP:Port.
lbIngressSet *IPSet
// lbSet is the hash:ip,port type ipset which stores all service load balancer IP:Port.
lbSet *IPSet
// lbLocalSet is the hash:ip type ipset which stores all service ip's with externaltrafficPolicy=local
lbLocalSet *IPSet
// lbFWSet is the hash:ip,port type ipset which stores all service load balancer ingress IP:Port for load balancer with sourceRange.
lbFWSet *IPSet
// lbWhiteListIPSet is the hash:ip,port,ip type ipset which stores all service load balancer ingress IP:Port,sourceIP pair, any packets
// with the source IP visit ingress IP:Port can pass through.
lbWhiteListIPSet *IPSet
@ -351,8 +356,9 @@ func NewProxier(ipt utiliptables.Interface,
loopbackSet: NewIPSet(ipset, KubeLoopBackIPSet, utilipset.HashIPPortIP, isIPv6),
clusterIPSet: NewIPSet(ipset, KubeClusterIPSet, utilipset.HashIPPort, isIPv6),
externalIPSet: NewIPSet(ipset, KubeExternalIPSet, utilipset.HashIPPort, isIPv6),
lbIngressSet: NewIPSet(ipset, KubeLoadBalancerSet, utilipset.HashIPPort, isIPv6),
lbIngressLocalSet: NewIPSet(ipset, KubeLoadBalancerIngressLocalSet, utilipset.HashIPPort, isIPv6),
lbSet: NewIPSet(ipset, KubeLoadBalancerSet, utilipset.HashIPPort, isIPv6),
lbFWSet: NewIPSet(ipset, KubeLoadbalancerFWSet, utilipset.HashIPPort, isIPv6),
lbLocalSet: NewIPSet(ipset, KubeLoadBalancerLocalSet, utilipset.HashIPPort, isIPv6),
lbWhiteListIPSet: NewIPSet(ipset, KubeLoadBalancerSourceIPSet, utilipset.HashIPPortIP, isIPv6),
lbWhiteListCIDRSet: NewIPSet(ipset, KubeLoadBalancerSourceCIDRSet, utilipset.HashIPPortNet, isIPv6),
nodePortSetTCP: NewIPSet(ipset, KubeNodePortSetTCP, utilipset.BitmapPort, false),
@ -496,7 +502,7 @@ func cleanupIptablesLeftovers(ipt utiliptables.Interface) (encounteredError bool
}
// Flush and remove all of our "-t nat" chains.
for _, chain := range []utiliptables.Chain{kubeServicesChain, kubePostroutingChain} {
for _, chain := range []utiliptables.Chain{kubeServicesChain, kubePostroutingChain, KubeNodePortChain, KubeLoadBalancerChain, KubeFireWallChain} {
if err := ipt.FlushChain(utiliptables.TableNAT, chain); err != nil {
if !utiliptables.IsNotFoundError(err) {
glog.Errorf("Error removing iptables rules in ipvs proxier: %v", err)
@ -553,8 +559,8 @@ func CleanupLeftovers(ipvs utilipvs.Interface, ipt utiliptables.Interface, ipset
// Destroy ip sets created by ipvs Proxier. We should call it after cleaning up
// iptables since we can NOT delete ip set which is still referenced by iptables.
ipSetsToDestroy := []string{KubeLoopBackIPSet, KubeClusterIPSet, KubeLoadBalancerSet, KubeNodePortSetTCP, KubeNodePortSetUDP,
KubeExternalIPSet, KubeLoadBalancerSourceIPSet, KubeLoadBalancerSourceCIDRSet,
KubeLoadBalancerIngressLocalSet, KubeNodePortLocalSetUDP, KubeNodePortLocalSetTCP}
KubeExternalIPSet, KubeLoadbalancerFWSet, KubeLoadBalancerSourceIPSet, KubeLoadBalancerSourceCIDRSet,
KubeLoadBalancerLocalSet, KubeNodePortLocalSetUDP, KubeNodePortLocalSetTCP}
for _, set := range ipSetsToDestroy {
err = ipset.DestroySet(set)
if err != nil {
@ -755,7 +761,7 @@ func (proxier *Proxier) syncProxyRules() {
// make sure ip sets exists in the system.
ipSets := []*IPSet{proxier.loopbackSet, proxier.clusterIPSet, proxier.externalIPSet, proxier.nodePortSetUDP, proxier.nodePortSetTCP,
proxier.lbIngressSet, proxier.lbWhiteListCIDRSet, proxier.lbWhiteListIPSet, proxier.lbIngressLocalSet,
proxier.lbSet, proxier.lbFWSet, proxier.lbWhiteListCIDRSet, proxier.lbWhiteListIPSet, proxier.lbLocalSet,
proxier.nodePortLocalSetTCP, proxier.nodePortLocalSetUDP}
if err := ensureIPSets(ipSets...); err != nil {
return
@ -788,9 +794,19 @@ func (proxier *Proxier) syncProxyRules() {
glog.Errorf("Failed to link KUBE-SERVICES chain: %v", err)
return
}
// `iptables -t nat -N KUBE-FIRE-WALL`
// `iptables -t nat -N KUBE-FIREWALL`
if err := proxier.createKubeChain(existingNATChains, KubeFireWallChain); err != nil {
glog.Errorf("Failed to create KUBE-FIRE-WALL chain: %v", err)
glog.Errorf("Failed to create KUBE-FIREWALL chain: %v", err)
return
}
// `iptables -t nat -N KUBE-NODE-PORT`
if err := proxier.createKubeChain(existingNATChains, KubeNodePortChain); err != nil {
glog.Errorf("Failed to create KUBE-NODE-PORT chain: %v", err)
return
}
// `iptables -t nat -N KUBE-LOAD-BALANCER`
if err := proxier.createKubeChain(existingNATChains, KubeLoadBalancerChain); err != nil {
glog.Errorf("Failed to create KUBE-LOAD-BALANCER chain: %v", err)
return
}
// Kube forward
@ -799,12 +815,6 @@ func (proxier *Proxier) syncProxyRules() {
return
}
// `iptables -t nat -N KUBE-NODE-PORT`
if err := proxier.createKubeChain(existingNATChains, KubeNodePortChain); err != nil {
glog.Errorf("Failed to create KUBE-NODE-PORT chain: %v", err)
return
}
// Build IPVS rules for each service.
for svcName, svc := range proxier.serviceMap {
svcInfo, ok := svc.(*serviceInfo)
@ -968,23 +978,28 @@ func (proxier *Proxier) syncProxyRules() {
// proxier.kubeServiceAccessSet.activeEntries.Insert(entry.String())
// If we are proxying globally, we need to masquerade in case we cross nodes.
// If we are proxying only locally, we can retain the source IP.
if valid := proxier.lbIngressSet.validateEntry(entry); !valid {
glog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.lbIngressSet.Name))
if valid := proxier.lbSet.validateEntry(entry); !valid {
glog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.lbSet.Name))
continue
}
proxier.lbIngressSet.activeEntries.Insert(entry.String())
proxier.lbSet.activeEntries.Insert(entry.String())
// insert loadbalancer entry to lbIngressLocalSet if service externaltrafficpolicy=local
if svcInfo.OnlyNodeLocalEndpoints {
if valid := proxier.lbIngressLocalSet.validateEntry(entry); !valid {
glog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.lbIngressSet.Name))
if valid := proxier.lbLocalSet.validateEntry(entry); !valid {
glog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.lbLocalSet.Name))
continue
}
proxier.lbIngressLocalSet.activeEntries.Insert(entry.String())
proxier.lbLocalSet.activeEntries.Insert(entry.String())
}
if len(svcInfo.LoadBalancerSourceRanges) != 0 {
// The service firewall rules are created based on ServiceSpec.loadBalancerSourceRanges field.
// This currently works for loadbalancers that preserves source ips.
// For loadbalancers which direct traffic to service NodePort, the firewall rules will not apply.
if valid := proxier.lbFWSet.validateEntry(entry); !valid {
glog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.lbFWSet.Name))
continue
}
proxier.lbFWSet.activeEntries.Insert(entry.String())
allowFromNode := false
for _, src := range svcInfo.LoadBalancerSourceRanges {
// ipset call
@ -1164,9 +1179,9 @@ func (proxier *Proxier) syncProxyRules() {
}
// sync ipset entries
ipsetsToSync := []*IPSet{proxier.loopbackSet, proxier.clusterIPSet, proxier.lbIngressSet, proxier.nodePortSetTCP,
proxier.nodePortSetUDP, proxier.externalIPSet, proxier.lbWhiteListIPSet, proxier.lbWhiteListCIDRSet, proxier.lbIngressLocalSet,
proxier.nodePortLocalSetTCP, proxier.nodePortLocalSetUDP}
ipsetsToSync := []*IPSet{proxier.loopbackSet, proxier.clusterIPSet, proxier.lbSet, proxier.nodePortSetTCP,
proxier.lbFWSet, proxier.nodePortSetUDP, proxier.externalIPSet, proxier.lbWhiteListIPSet,
proxier.lbWhiteListCIDRSet, proxier.lbLocalSet, proxier.nodePortLocalSetTCP, proxier.nodePortLocalSetUDP}
for i := range ipsetsToSync {
ipsetsToSync[i].syncIPSetEntries()
}
@ -1219,39 +1234,31 @@ func (proxier *Proxier) syncProxyRules() {
// This covers cases like GCE load-balancers which get added to the local routing table.
writeLine(proxier.natRules, append(dstLocalOnlyArgs, "-j", "ACCEPT")...)
}
if !proxier.lbIngressSet.isEmpty() {
if !proxier.lbSet.isEmpty() {
// Build masquerade rules for packets which cross node visit load balancer ingress IPs.
args = append(args[:0],
"-A", string(kubeServicesChain),
"-m", "set", "--match-set", proxier.lbIngressSet.Name,
"-m", "set", "--match-set", proxier.lbSet.Name,
"dst,dst",
)
writeLine(proxier.natRules, append(args, "-j", string(KubeFireWallChain))...)
// Don't masq for service with externaltrafficpolicy =local
if !proxier.lbIngressLocalSet.isEmpty() {
args = append(args[:0],
"-A", string(KubeFireWallChain),
"-m", "set", "--match-set", proxier.lbIngressLocalSet.Name,
"dst,dst",
)
writeLine(proxier.natRules, append(args, "-j", "ACCEPT")...)
}
// mark masq for others
args = append(args[:0],
"-A", string(KubeFireWallChain),
"-m", "comment", "--comment",
fmt.Sprintf(`"mark MASQ for external traffic policy not local"`),
)
writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
writeLine(proxier.natRules, append(args, "-j", string(KubeLoadBalancerChain))...)
// if have whitelist, accept or drop.
if !proxier.lbWhiteListCIDRSet.isEmpty() || !proxier.lbWhiteListIPSet.isEmpty() {
if !proxier.lbFWSet.isEmpty() {
args = append(args[:0],
"-A", string(KubeLoadBalancerChain),
"-m", "set", "--match-set", proxier.lbFWSet.Name,
"dst,dst",
)
writeLine(proxier.natRules, append(args, "-j", string(KubeFireWallChain))...)
}
if !proxier.lbWhiteListCIDRSet.isEmpty() {
args = append(args[:0],
"-A", string(KubeFireWallChain),
"-m", "set", "--match-set", proxier.lbWhiteListCIDRSet.Name,
"dst,dst,src",
)
writeLine(proxier.natRules, append(args, "-j", "ACCEPT")...)
writeLine(proxier.natRules, append(args, "-j", "RETURN")...)
}
if !proxier.lbWhiteListIPSet.isEmpty() {
args = append(args[:0],
@ -1259,7 +1266,7 @@ func (proxier *Proxier) syncProxyRules() {
"-m", "set", "--match-set", proxier.lbWhiteListIPSet.Name,
"dst,dst,src",
)
writeLine(proxier.natRules, append(args, "-j", "ACCEPT")...)
writeLine(proxier.natRules, append(args, "-j", "RETURN")...)
}
args = append(args[:0],
"-A", string(KubeFireWallChain),
@ -1268,6 +1275,22 @@ func (proxier *Proxier) syncProxyRules() {
// It means the packet cannot go thru the firewall, then mark it for DROP
writeLine(proxier.natRules, append(args, "-j", string(KubeMarkDropChain))...)
}
// Don't masq for service with externaltrafficpolicy =local
if !proxier.lbLocalSet.isEmpty() {
args = append(args[:0],
"-A", string(KubeLoadBalancerChain),
"-m", "set", "--match-set", proxier.lbLocalSet.Name,
"dst,dst",
)
writeLine(proxier.natRules, append(args, "-j", "RETURN")...)
}
// mark masq for others
args = append(args[:0],
"-A", string(KubeLoadBalancerChain),
"-m", "comment", "--comment",
fmt.Sprintf(`"mark MASQ for external traffic policy not local"`),
)
writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
}
if !proxier.nodePortSetTCP.isEmpty() {
// Build masquerade rules for packets which cross node visit nodeport.
@ -1424,7 +1447,7 @@ func (proxier *Proxier) syncProxyRules() {
}
func (proxier *Proxier) acceptIPVSTraffic() {
sets := []*IPSet{proxier.clusterIPSet, proxier.externalIPSet, proxier.lbIngressSet}
sets := []*IPSet{proxier.clusterIPSet, proxier.externalIPSet, proxier.lbSet}
for _, set := range sets {
var matchType string
if !set.isEmpty() {

View File

@ -145,8 +145,9 @@ func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset u
loopbackSet: NewIPSet(ipset, KubeLoopBackIPSet, utilipset.HashIPPortIP, false),
clusterIPSet: NewIPSet(ipset, KubeClusterIPSet, utilipset.HashIPPort, false),
externalIPSet: NewIPSet(ipset, KubeExternalIPSet, utilipset.HashIPPort, false),
lbIngressSet: NewIPSet(ipset, KubeLoadBalancerSet, utilipset.HashIPPort, false),
lbIngressLocalSet: NewIPSet(ipset, KubeLoadBalancerIngressLocalSet, utilipset.HashIPPort, false),
lbSet: NewIPSet(ipset, KubeLoadBalancerSet, utilipset.HashIPPort, false),
lbFWSet: NewIPSet(ipset, KubeLoadbalancerFWSet, utilipset.HashIPPort, false),
lbLocalSet: NewIPSet(ipset, KubeLoadBalancerLocalSet, utilipset.HashIPPort, false),
lbWhiteListIPSet: NewIPSet(ipset, KubeLoadBalancerSourceIPSet, utilipset.HashIPPortIP, false),
lbWhiteListCIDRSet: NewIPSet(ipset, KubeLoadBalancerSourceCIDRSet, utilipset.HashIPPortNet, false),
nodePortSetTCP: NewIPSet(ipset, KubeNodePortSetTCP, utilipset.BitmapPort, false),
@ -957,6 +958,12 @@ func TestLoadBalanceSourceRanges(t *testing.T) {
Protocol: strings.ToLower(string(api.ProtocolTCP)),
SetType: utilipset.HashIPPort,
}},
KubeLoadbalancerFWSet: {{
IP: svcLBIP,
Port: svcPort,
Protocol: strings.ToLower(string(api.ProtocolTCP)),
SetType: utilipset.HashIPPort,
}},
KubeLoadBalancerSourceCIDRSet: {{
IP: svcLBIP,
Port: svcPort,
@ -970,10 +977,15 @@ func TestLoadBalanceSourceRanges(t *testing.T) {
// Check iptables chain and rules
epIpt := netlinktest.ExpectedIptablesChain{
string(kubeServicesChain): {{
JumpChain: string(KubeFireWallChain), MatchSet: KubeLoadBalancerSet,
JumpChain: string(KubeLoadBalancerChain), MatchSet: KubeLoadBalancerSet,
}},
string(KubeLoadBalancerChain): {{
JumpChain: string(KubeFireWallChain), MatchSet: KubeLoadbalancerFWSet,
}, {
JumpChain: string(KubeMarkMasqChain), MatchSet: "",
}},
string(KubeFireWallChain): {{
JumpChain: "ACCEPT", MatchSet: KubeLoadBalancerSourceCIDRSet,
JumpChain: "RETURN", MatchSet: KubeLoadBalancerSourceCIDRSet,
}, {
JumpChain: string(KubeMarkDropChain), MatchSet: "",
}},
@ -1109,7 +1121,7 @@ func TestOnlyLocalLoadBalancing(t *testing.T) {
Protocol: strings.ToLower(string(api.ProtocolTCP)),
SetType: utilipset.HashIPPort,
}},
KubeLoadBalancerIngressLocalSet: {{
KubeLoadBalancerLocalSet: {{
IP: svcLBIP,
Port: svcPort,
Protocol: strings.ToLower(string(api.ProtocolTCP)),
@ -1121,10 +1133,10 @@ func TestOnlyLocalLoadBalancing(t *testing.T) {
// Check iptables chain and rules
epIpt := netlinktest.ExpectedIptablesChain{
string(kubeServicesChain): {{
JumpChain: string(KubeFireWallChain), MatchSet: KubeLoadBalancerSet,
JumpChain: string(KubeLoadBalancerChain), MatchSet: KubeLoadBalancerSet,
}},
string(KubeFireWallChain): {{
JumpChain: "ACCEPT", MatchSet: KubeLoadBalancerIngressLocalSet,
string(KubeLoadBalancerChain): {{
JumpChain: "RETURN", MatchSet: KubeLoadBalancerLocalSet,
}, {
JumpChain: string(KubeMarkMasqChain), MatchSet: "",
}},
@ -2580,7 +2592,7 @@ func checkIPSet(t *testing.T, fp *Proxier, ipSet netlinktest.ExpectedIPSet) {
for set, entries := range ipSet {
ents, err := fp.ipset.ListEntries(set)
if err != nil || len(ents) != len(entries) {
t.Errorf("Check ipset entries failed for ipset: %q", set)
t.Errorf("Check ipset entries failed for ipset: %q, expect %d, got %d", set, len(entries), len(ents))
continue
}
if len(entries) == 1 {