mirror of https://github.com/k3s-io/k3s
add cleanLegacyBindAddr
update gofmt && modify comment && modify currentBindAddrs modify log to Upper-casepull/8/head
parent
b05a61e299
commit
7973626e94
|
@ -30,6 +30,8 @@ type NetLinkHandle interface {
|
|||
EnsureDummyDevice(devName string) (exist bool, err error)
|
||||
// DeleteDummyDevice deletes the given dummy device by name.
|
||||
DeleteDummyDevice(devName string) error
|
||||
// ListBindAddress will list all IP addresses which are bound in a given interface
|
||||
ListBindAddress(devName string) ([]string, error)
|
||||
// GetLocalAddresses returns all unique local type IP addresses based on filter device interface. If filter device is not given,
|
||||
// it will list all unique local type addresses.
|
||||
GetLocalAddresses(filterDev string) (sets.String, error)
|
||||
|
|
|
@ -105,6 +105,23 @@ func (h *netlinkHandle) DeleteDummyDevice(devName string) error {
|
|||
return h.LinkDel(dummy)
|
||||
}
|
||||
|
||||
// ListBindAddress will list all IP addresses which are bound in a given interface
|
||||
func (h *netlinkHandle) ListBindAddress(devName string) ([]string, error) {
|
||||
dev, err := h.LinkByName(devName)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error get interface: %s, err: %v", devName, err)
|
||||
}
|
||||
addrs, err := h.AddrList(dev, 0)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error list bound address of interface: %s, err: %v", devName, err)
|
||||
}
|
||||
ips := make([]string, 0)
|
||||
for _, addr := range addrs {
|
||||
ips = append(ips, addr.IP.String())
|
||||
}
|
||||
return ips, nil
|
||||
}
|
||||
|
||||
// GetLocalAddresses lists all LOCAL type IP addresses from host based on filter device.
|
||||
// If filter device is not specified, it's equivalent to exec:
|
||||
// $ ip route show table local type local proto kernel
|
||||
|
|
|
@ -52,6 +52,11 @@ func (h *emptyHandle) DeleteDummyDevice(devName string) error {
|
|||
return fmt.Errorf("netlink is not supported in this platform")
|
||||
}
|
||||
|
||||
// ListBindAddress is part of interface.
|
||||
func (h *emptyHandle) ListBindAddress(devName string) ([]string, error) {
|
||||
return nil, fmt.Errorf("netlink is not supported in this platform")
|
||||
}
|
||||
|
||||
// GetLocalAddresses is part of interface.
|
||||
func (h *emptyHandle) GetLocalAddresses(filterDev string) (sets.String, error) {
|
||||
return nil, fmt.Errorf("netlink is not supported in this platform")
|
||||
|
|
|
@ -739,6 +739,8 @@ func (proxier *Proxier) syncProxyRules() {
|
|||
activeIPVSServices := map[string]bool{}
|
||||
// currentIPVSServices represent IPVS services listed from the system
|
||||
currentIPVSServices := make(map[string]*utilipvs.VirtualServer)
|
||||
// activeBindAddrs represents ip address successfully bind to DefaultDummyDevice in this round of sync
|
||||
activeBindAddrs := map[string]bool{}
|
||||
|
||||
// Build IPVS rules for each service.
|
||||
for svcName, svc := range proxier.serviceMap {
|
||||
|
@ -812,6 +814,7 @@ func (proxier *Proxier) syncProxyRules() {
|
|||
// We need to bind ClusterIP to dummy interface, so set `bindAddr` parameter to `true` in syncService()
|
||||
if err := proxier.syncService(svcNameString, serv, true); err == nil {
|
||||
activeIPVSServices[serv.String()] = true
|
||||
activeBindAddrs[serv.Address.String()] = true
|
||||
// ExternalTrafficPolicy only works for NodePort and external LB traffic, does not affect ClusterIP
|
||||
// So we still need clusterIP rules in onlyNodeLocalEndpoints mode.
|
||||
if err := proxier.syncEndpoint(svcName, false, serv); err != nil {
|
||||
|
@ -881,6 +884,7 @@ func (proxier *Proxier) syncProxyRules() {
|
|||
}
|
||||
if err := proxier.syncService(svcNameString, serv, true); err == nil {
|
||||
activeIPVSServices[serv.String()] = true
|
||||
activeBindAddrs[serv.Address.String()] = true
|
||||
if err := proxier.syncEndpoint(svcName, false, serv); err != nil {
|
||||
glog.Errorf("Failed to sync endpoint for service: %v, err: %v", serv, err)
|
||||
}
|
||||
|
@ -983,6 +987,7 @@ func (proxier *Proxier) syncProxyRules() {
|
|||
// check if service need skip endpoints that not in same host as kube-proxy
|
||||
onlyLocal := svcInfo.SessionAffinityType == api.ServiceAffinityClientIP && svcInfo.OnlyNodeLocalEndpoints
|
||||
activeIPVSServices[serv.String()] = true
|
||||
activeBindAddrs[serv.Address.String()] = true
|
||||
if err := proxier.syncEndpoint(svcName, onlyLocal, serv); err != nil {
|
||||
glog.Errorf("Failed to sync endpoint for service: %v, err: %v", serv, err)
|
||||
}
|
||||
|
@ -1164,6 +1169,14 @@ func (proxier *Proxier) syncProxyRules() {
|
|||
}
|
||||
proxier.cleanLegacyService(activeIPVSServices, currentIPVSServices)
|
||||
|
||||
// Clean up legacy bind address
|
||||
// currentBindAddrs represents ip addresses bind to DefaultDummyDevice from the system
|
||||
currentBindAddrs, err := proxier.netlinkHandle.ListBindAddress(DefaultDummyDevice)
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to get bind address, err: %v", err)
|
||||
}
|
||||
proxier.cleanLegacyBindAddr(activeBindAddrs, currentBindAddrs)
|
||||
|
||||
// Update healthz timestamp
|
||||
if proxier.healthzServer != nil {
|
||||
proxier.healthzServer.UpdateTimestamp()
|
||||
|
@ -1481,6 +1494,7 @@ func (proxier *Proxier) syncService(svcName string, vs *utilipvs.VirtualServer,
|
|||
// bind service address to dummy interface even if service not changed,
|
||||
// in case that service IP was removed by other processes
|
||||
if bindAddr {
|
||||
glog.V(4).Infof("Bind addr %s", vs.Address.String())
|
||||
_, err := proxier.netlinkHandle.EnsureAddressBind(vs.Address.String(), DefaultDummyDevice)
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to bind service address to dummy device %q: %v", svcName, err)
|
||||
|
@ -1571,7 +1585,6 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode
|
|||
}
|
||||
|
||||
func (proxier *Proxier) cleanLegacyService(activeServices map[string]bool, currentServices map[string]*utilipvs.VirtualServer) {
|
||||
unbindIPAddr := sets.NewString()
|
||||
for cs := range currentServices {
|
||||
svc := currentServices[cs]
|
||||
if _, ok := activeServices[cs]; !ok {
|
||||
|
@ -1590,16 +1603,21 @@ func (proxier *Proxier) cleanLegacyService(activeServices map[string]bool, curre
|
|||
if err := proxier.ipvs.DeleteVirtualServer(svc); err != nil {
|
||||
glog.Errorf("Failed to delete service, error: %v", err)
|
||||
}
|
||||
unbindIPAddr.Insert(svc.Address.String())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for _, addr := range unbindIPAddr.UnsortedList() {
|
||||
err := proxier.netlinkHandle.UnbindAddress(addr, DefaultDummyDevice)
|
||||
// Ignore no such address error when try to unbind address
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to unbind service addr %s from dummy interface %s: %v", addr, DefaultDummyDevice, err)
|
||||
func (proxier *Proxier) cleanLegacyBindAddr(activeBindAddrs map[string]bool, currentBindAddrs []string) {
|
||||
for _, addr := range currentBindAddrs {
|
||||
if _, ok := activeBindAddrs[addr]; !ok {
|
||||
// This address was not processed in the latest sync loop
|
||||
glog.V(4).Infof("Unbind addr %s", addr)
|
||||
err := proxier.netlinkHandle.UnbindAddress(addr, DefaultDummyDevice)
|
||||
// Ignore no such address error when try to unbind address
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to unbind service addr %s from dummy interface %s: %v", addr, DefaultDummyDevice, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -57,6 +57,11 @@ func (h *FakeNetlinkHandle) DeleteDummyDevice(devName string) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// ListBindAddress is a mock implementation
|
||||
func (h *FakeNetlinkHandle) ListBindAddress(devName string) ([]string, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// GetLocalAddresses is a mock implementation
|
||||
func (h *FakeNetlinkHandle) GetLocalAddresses(filterDev string) (sets.String, error) {
|
||||
res := sets.NewString()
|
||||
|
|
Loading…
Reference in New Issue