From 80ff8b359c18c1a9728b6d301583cac54ca071a3 Mon Sep 17 00:00:00 2001 From: liangwei Date: Thu, 19 Jul 2018 20:18:04 +0800 Subject: [PATCH] ipvs support graceful termination --- pkg/proxy/ipvs/BUILD | 1 + pkg/proxy/ipvs/graceful_termination.go | 227 +++++++++++++++++++++++++ pkg/proxy/ipvs/proxier.go | 173 +++++++++++-------- pkg/util/ipvs/ipvs.go | 2 + pkg/util/ipvs/ipvs_linux.go | 12 ++ pkg/util/ipvs/ipvs_unsupported.go | 4 + pkg/util/ipvs/testing/fake.go | 9 + 7 files changed, 355 insertions(+), 73 deletions(-) create mode 100644 pkg/proxy/ipvs/graceful_termination.go diff --git a/pkg/proxy/ipvs/BUILD b/pkg/proxy/ipvs/BUILD index 9b46b66bcf..e7b13f3cac 100644 --- a/pkg/proxy/ipvs/BUILD +++ b/pkg/proxy/ipvs/BUILD @@ -37,6 +37,7 @@ go_test( go_library( name = "go_default_library", srcs = [ + "graceful_termination.go", "ipset.go", "netlink.go", "netlink_linux.go", diff --git a/pkg/proxy/ipvs/graceful_termination.go b/pkg/proxy/ipvs/graceful_termination.go new file mode 100644 index 0000000000..8df14ec774 --- /dev/null +++ b/pkg/proxy/ipvs/graceful_termination.go @@ -0,0 +1,227 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ipvs + +import ( + "container/list" + "sync" + "time" + + "fmt" + "github.com/golang/glog" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" + utilipvs "k8s.io/kubernetes/pkg/util/ipvs" +) + +const ( + rsGracefulDeletePeriod = 15 * time.Minute + rsCheckDeleteInterval = 1 * time.Minute +) + +// listItem stores real server information and the process time. +// If nothing special happened, real server will be delete after process time. +type listItem struct { + VirtualServer *utilipvs.VirtualServer + RealServer *utilipvs.RealServer + ProcessAt time.Time +} + +// String return the unique real server name(with virtual server information) +func (g *listItem) String() string { + return GetUniqueRSName(g.VirtualServer, g.RealServer) +} + +// GetUniqueRSName return a string type unique rs name with vs information +func GetUniqueRSName(vs *utilipvs.VirtualServer, rs *utilipvs.RealServer) string { + return vs.String() + "/" + rs.String() +} + +type graceTerminateRSList struct { + lock sync.Mutex + list *list.List + set sets.String +} + +// add push an new element to the rsList +func (q *graceTerminateRSList) add(rs *listItem) bool { + q.lock.Lock() + defer q.lock.Unlock() + + uniqueRS := rs.String() + if q.set.Has(uniqueRS) { + return false + } + glog.V(5).Infof("Pushing rs %v to graceful delete rsList: %+v", rs) + + q.list.PushBack(rs) + q.set.Insert(uniqueRS) + return true +} + +// remove remove an element from the rsList +func (q *graceTerminateRSList) remove(rs *listItem) bool { + q.lock.Lock() + defer q.lock.Unlock() + + uniqueRS := rs.String() + if !q.set.Has(uniqueRS) { + return false + } + q.set.Delete(uniqueRS) + for e := q.list.Front(); e.Next() == nil; e = e.Next() { + val := e.Value.(*listItem) + if val.String() == uniqueRS { + q.list.Remove(e) + return true + } + } + return false +} + +// head return the first element from the rsList +func (q *graceTerminateRSList) head() (*listItem, bool) { + q.lock.Lock() + defer q.lock.Unlock() + if q.list.Len() == 0 { + return nil, false + } + result := q.list.Front().Value.(*listItem) + return result, true +} + +// exist check whether the specified unique RS is in the rsList +func (q *graceTerminateRSList) exist(uniqueRS string) (*listItem, bool) { + q.lock.Lock() + defer q.lock.Unlock() + + if !q.set.Has(uniqueRS) { + return nil, false + } + for e := q.list.Front(); e.Next() == nil; e = e.Next() { + val := e.Value.(*listItem) + if val.String() == uniqueRS { + return val, true + } + } + return nil, false +} + +// GracefulTerminationManager manage rs graceful termination information and do graceful termination work +// rsList is the rs list to graceful termination, ipvs is the ipvsinterface to do ipvs delete/update work +type GracefulTerminationManager struct { + rsList graceTerminateRSList + ipvs utilipvs.Interface +} + +// NewGracefulTerminationManager create a gracefulTerminationManager to manage ipvs rs graceful termination work +func NewGracefulTerminationManager(ipvs utilipvs.Interface) *GracefulTerminationManager { + l := list.New() + l.Init() + return &GracefulTerminationManager{ + rsList: graceTerminateRSList{ + list: l, + set: sets.NewString(), + }, + ipvs: ipvs, + } +} + +// InTerminationList to check whether specified unique rs name is in graceful termination list +func (m *GracefulTerminationManager) InTerminationList(uniqueRS string) bool { + _, exist := m.rsList.exist(uniqueRS) + return exist +} + +// GracefulDeleteRS to update rs weight to 0, and add rs to graceful terminate list +func (m *GracefulTerminationManager) GracefulDeleteRS(vs *utilipvs.VirtualServer, rs *utilipvs.RealServer) error { + rs.Weight = 0 + err := m.ipvs.UpdateRealServer(vs, rs) + if err != nil { + return err + } + + ele := &listItem{ + VirtualServer: vs, + RealServer: rs, + ProcessAt: time.Now().Add(rsGracefulDeletePeriod), + } + glog.V(5).Infof("Adding an element to graceful delete rsList: %+v", ele) + m.rsList.add(ele) + return nil +} + +func (m *GracefulTerminationManager) tryDeleteRs() { + for { + rsToDelete, _ := m.rsList.head() + glog.V(5).Infof("Trying to delete rs") + if rsToDelete == nil || rsToDelete.ProcessAt.After(time.Now()) { + break + } + + glog.V(5).Infof("Deleting rs: %s", rsToDelete.String()) + err := m.ipvs.DeleteRealServer(rsToDelete.VirtualServer, rsToDelete.RealServer) + if err != nil { + glog.Errorf("Failed to delete destination: %v, error: %v", rsToDelete.RealServer, err) + } + if !m.rsList.remove(rsToDelete) { + glog.Errorf("Failed to pop out rsList.") + } + } +} + +// MoveRSOutofGracefulDeleteList to delete an rs and remove it from the rsList immediately +func (m *GracefulTerminationManager) MoveRSOutofGracefulDeleteList(uniqueRS string) error { + rsToDelete, find := m.rsList.exist(uniqueRS) + if !find || rsToDelete == nil { + return fmt.Errorf("failed to find rs: %q", uniqueRS) + } + err := m.ipvs.DeleteRealServer(rsToDelete.VirtualServer, rsToDelete.RealServer) + if err != nil { + return err + } + m.rsList.remove(rsToDelete) + return nil +} + +// Run start a goroutine to try to delete rs in the graceful delete rsList with an interval 1 minute +func (m *GracefulTerminationManager) Run() { + // before start, add leftover in delete rs to graceful delete rsList + vss, err := m.ipvs.GetVirtualServers() + if err != nil { + glog.Errorf("IPVS graceful delete manager failed to get IPVS virtualserver") + } + for _, vs := range vss { + rss, err := m.ipvs.GetRealServers(vs) + if err != nil { + glog.Errorf("IPVS graceful delete manager failed to get %v realserver", vs) + continue + } + for _, rs := range rss { + if rs.Weight == 0 { + ele := &listItem{ + VirtualServer: vs, + RealServer: rs, + ProcessAt: time.Now().Add(rsGracefulDeletePeriod), + } + m.rsList.add(ele) + } + } + } + + go wait.Until(m.tryDeleteRs, rsCheckDeleteInterval, wait.NeverStop) +} diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index 19fe7927eb..2cea33223b 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -231,7 +231,8 @@ type Proxier struct { nodePortAddresses []string // networkInterfacer defines an interface for several net library functions. // Inject for test purpose. - networkInterfacer utilproxy.NetworkInterfacer + networkInterfacer utilproxy.NetworkInterfacer + gracefuldeleteManager *GracefulTerminationManager } // IPGetter helps get node network interface IP @@ -353,38 +354,39 @@ func NewProxier(ipt utiliptables.Interface, healthChecker := healthcheck.NewServer(hostname, recorder, nil, nil) // use default implementations of deps proxier := &Proxier{ - portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable), - serviceMap: make(proxy.ServiceMap), - serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, &isIPv6, recorder), - endpointsMap: make(proxy.EndpointsMap), - endpointsChanges: proxy.NewEndpointChangeTracker(hostname, nil, &isIPv6, recorder), - syncPeriod: syncPeriod, - minSyncPeriod: minSyncPeriod, - excludeCIDRs: excludeCIDRs, - iptables: ipt, - masqueradeAll: masqueradeAll, - masqueradeMark: masqueradeMark, - exec: exec, - clusterCIDR: clusterCIDR, - hostname: hostname, - nodeIP: nodeIP, - portMapper: &listenPortOpener{}, - recorder: recorder, - healthChecker: healthChecker, - healthzServer: healthzServer, - ipvs: ipvs, - ipvsScheduler: scheduler, - ipGetter: &realIPGetter{nl: NewNetLinkHandle()}, - iptablesData: bytes.NewBuffer(nil), - filterChainsData: bytes.NewBuffer(nil), - natChains: bytes.NewBuffer(nil), - natRules: bytes.NewBuffer(nil), - filterChains: bytes.NewBuffer(nil), - filterRules: bytes.NewBuffer(nil), - netlinkHandle: NewNetLinkHandle(), - ipset: ipset, - nodePortAddresses: nodePortAddresses, - networkInterfacer: utilproxy.RealNetwork{}, + portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable), + serviceMap: make(proxy.ServiceMap), + serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, &isIPv6, recorder), + endpointsMap: make(proxy.EndpointsMap), + endpointsChanges: proxy.NewEndpointChangeTracker(hostname, nil, &isIPv6, recorder), + syncPeriod: syncPeriod, + minSyncPeriod: minSyncPeriod, + excludeCIDRs: excludeCIDRs, + iptables: ipt, + masqueradeAll: masqueradeAll, + masqueradeMark: masqueradeMark, + exec: exec, + clusterCIDR: clusterCIDR, + hostname: hostname, + nodeIP: nodeIP, + portMapper: &listenPortOpener{}, + recorder: recorder, + healthChecker: healthChecker, + healthzServer: healthzServer, + ipvs: ipvs, + ipvsScheduler: scheduler, + ipGetter: &realIPGetter{nl: NewNetLinkHandle()}, + iptablesData: bytes.NewBuffer(nil), + filterChainsData: bytes.NewBuffer(nil), + natChains: bytes.NewBuffer(nil), + natRules: bytes.NewBuffer(nil), + filterChains: bytes.NewBuffer(nil), + filterRules: bytes.NewBuffer(nil), + netlinkHandle: NewNetLinkHandle(), + ipset: ipset, + nodePortAddresses: nodePortAddresses, + networkInterfacer: utilproxy.RealNetwork{}, + gracefuldeleteManager: NewGracefulTerminationManager(ipvs), } // initialize ipsetList with all sets we needed proxier.ipsetList = make(map[string]*IPSet) @@ -397,6 +399,7 @@ func NewProxier(ipt utiliptables.Interface, burstSyncs := 2 glog.V(3).Infof("minSyncPeriod: %v, syncPeriod: %v, burstSyncs: %d", minSyncPeriod, syncPeriod, burstSyncs) proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs) + proxier.gracefuldeleteManager.Run() return proxier, nil } @@ -1510,53 +1513,72 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode newEndpoints.Insert(epInfo.String()) } - if !curEndpoints.Equal(newEndpoints) { - // Create new endpoints - for _, ep := range newEndpoints.Difference(curEndpoints).UnsortedList() { - ip, port, err := net.SplitHostPort(ep) - if err != nil { - glog.Errorf("Failed to parse endpoint: %v, error: %v", ep, err) - continue - } - portNum, err := strconv.Atoi(port) - if err != nil { - glog.Errorf("Failed to parse endpoint port %s, error: %v", port, err) - continue - } + // Create new endpoints + for _, ep := range newEndpoints.List() { + ip, port, err := net.SplitHostPort(ep) + if err != nil { + glog.Errorf("Failed to parse endpoint: %v, error: %v", ep, err) + continue + } + portNum, err := strconv.Atoi(port) + if err != nil { + glog.Errorf("Failed to parse endpoint port %s, error: %v", port, err) + continue + } - newDest := &utilipvs.RealServer{ - Address: net.ParseIP(ip), - Port: uint16(portNum), - Weight: 1, + newDest := &utilipvs.RealServer{ + Address: net.ParseIP(ip), + Port: uint16(portNum), + Weight: 1, + } + + if curEndpoints.Has(ep) { + // check if newEndpoint is in gracefulDelete list, is true, delete this ep immediately + uniqueRS := GetUniqueRSName(vs, newDest) + if !proxier.gracefuldeleteManager.InTerminationList(uniqueRS) { + continue } - err = proxier.ipvs.AddRealServer(appliedVirtualServer, newDest) + glog.V(5).Infof("new ep %q is in graceful delete list", uniqueRS) + err := proxier.gracefuldeleteManager.MoveRSOutofGracefulDeleteList(uniqueRS) if err != nil { - glog.Errorf("Failed to add destination: %v, error: %v", newDest, err) + glog.Errorf("Failed to delete endpoint: %v in gracefulDeleteQueue, error: %v", ep, err) continue } } - // Delete old endpoints - for _, ep := range curEndpoints.Difference(newEndpoints).UnsortedList() { - ip, port, err := net.SplitHostPort(ep) - if err != nil { - glog.Errorf("Failed to parse endpoint: %v, error: %v", ep, err) - continue - } - portNum, err := strconv.Atoi(port) - if err != nil { - glog.Errorf("Failed to parse endpoint port %s, error: %v", port, err) - continue - } + err = proxier.ipvs.AddRealServer(appliedVirtualServer, newDest) + if err != nil { + glog.Errorf("Failed to add destination: %v, error: %v", newDest, err) + continue + } + } + // Delete old endpoints + for _, ep := range curEndpoints.Difference(newEndpoints).UnsortedList() { + // if curEndpoint is in gracefulDelete, skip + uniqueRS := vs.String() + "/" + ep + if proxier.gracefuldeleteManager.InTerminationList(uniqueRS) { + continue + } + ip, port, err := net.SplitHostPort(ep) + if err != nil { + glog.Errorf("Failed to parse endpoint: %v, error: %v", ep, err) + continue + } + portNum, err := strconv.Atoi(port) + if err != nil { + glog.Errorf("Failed to parse endpoint port %s, error: %v", port, err) + continue + } - delDest := &utilipvs.RealServer{ - Address: net.ParseIP(ip), - Port: uint16(portNum), - } - err = proxier.ipvs.DeleteRealServer(appliedVirtualServer, delDest) - if err != nil { - glog.Errorf("Failed to delete destination: %v, error: %v", delDest, err) - continue - } + delDest := &utilipvs.RealServer{ + Address: net.ParseIP(ip), + Port: uint16(portNum), + } + + glog.V(5).Infof("Using graceful delete to delete: %v", delDest) + err = proxier.gracefuldeleteManager.GracefulDeleteRS(appliedVirtualServer, delDest) + if err != nil { + glog.Errorf("Failed to delete destination: %v, error: %v", delDest, err) + continue } } return nil @@ -1569,6 +1591,11 @@ func (proxier *Proxier) cleanLegacyService(activeServices map[string]bool, curre // This service was not processed in the latest sync loop so before deleting it, // make sure it does not fall within an excluded CIDR range. okayToDelete := true + rsList, err := proxier.ipvs.GetRealServers(svc) + if len(rsList) != 0 && err == nil { + glog.V(5).Infof("Will not delete VS: %v, cause it have RS: %v", svc, rsList) + okayToDelete = false + } for _, excludedCIDR := range proxier.excludeCIDRs { // Any validation of this CIDR already should have occurred. _, n, _ := net.ParseCIDR(excludedCIDR) diff --git a/pkg/util/ipvs/ipvs.go b/pkg/util/ipvs/ipvs.go index 58e76a56c9..d6910abaef 100644 --- a/pkg/util/ipvs/ipvs.go +++ b/pkg/util/ipvs/ipvs.go @@ -41,6 +41,8 @@ type Interface interface { GetRealServers(*VirtualServer) ([]*RealServer, error) // DeleteRealServer deletes the specified real server from the specified virtual server. DeleteRealServer(*VirtualServer, *RealServer) error + // UpdateRealServer updates the specified real server from the specified virtual server. + UpdateRealServer(*VirtualServer, *RealServer) error } // VirtualServer is an user-oriented definition of an IPVS virtual server in its entirety. diff --git a/pkg/util/ipvs/ipvs_linux.go b/pkg/util/ipvs/ipvs_linux.go index 47c640c183..a6d2e1862d 100644 --- a/pkg/util/ipvs/ipvs_linux.go +++ b/pkg/util/ipvs/ipvs_linux.go @@ -144,6 +144,18 @@ func (runner *runner) DeleteRealServer(vs *VirtualServer, rs *RealServer) error return runner.ipvsHandle.DelDestination(svc, dst) } +func (runner *runner) UpdateRealServer(vs *VirtualServer, rs *RealServer) error { + svc, err := toIPVSService(vs) + if err != nil { + return err + } + dst, err := toIPVSDestination(rs) + if err != nil { + return err + } + return runner.ipvsHandle.UpdateDestination(svc, dst) +} + // GetRealServers is part of ipvs.Interface. func (runner *runner) GetRealServers(vs *VirtualServer) ([]*RealServer, error) { svc, err := toIPVSService(vs) diff --git a/pkg/util/ipvs/ipvs_unsupported.go b/pkg/util/ipvs/ipvs_unsupported.go index dd4d5b625b..86447d57c5 100644 --- a/pkg/util/ipvs/ipvs_unsupported.go +++ b/pkg/util/ipvs/ipvs_unsupported.go @@ -68,4 +68,8 @@ func (runner *runner) DeleteRealServer(*VirtualServer, *RealServer) error { return fmt.Errorf("IPVS not supported for this platform") } +func (runner *runner) UpdateRealServer(*VirtualServer, *RealServer) error { + return fmt.Errorf("IPVS not supported for this platform") +} + var _ = Interface(&runner{}) diff --git a/pkg/util/ipvs/testing/fake.go b/pkg/util/ipvs/testing/fake.go index 6e015a20ee..bfc854bb36 100644 --- a/pkg/util/ipvs/testing/fake.go +++ b/pkg/util/ipvs/testing/fake.go @@ -193,4 +193,13 @@ func (f *FakeIPVS) DeleteRealServer(serv *utilipvs.VirtualServer, dest *utilipvs return nil } +// UpdateRealServer is a fake implementation, it deletes the old real server then add new real server +func (f *FakeIPVS) UpdateRealServer(serv *utilipvs.VirtualServer, dest *utilipvs.RealServer) error { + err := f.DeleteRealServer(serv, dest) + if err != nil { + return err + } + return f.AddRealServer(serv, dest) +} + var _ = utilipvs.Interface(&FakeIPVS{})