ipvs support graceful termination

pull/58/head
liangwei 2018-07-19 20:18:04 +08:00 committed by Lion-Wei
parent 3fe21e5433
commit 80ff8b359c
7 changed files with 355 additions and 73 deletions

View File

@ -37,6 +37,7 @@ go_test(
go_library(
name = "go_default_library",
srcs = [
"graceful_termination.go",
"ipset.go",
"netlink.go",
"netlink_linux.go",

View File

@ -0,0 +1,227 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ipvs
import (
"container/list"
"sync"
"time"
"fmt"
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
utilipvs "k8s.io/kubernetes/pkg/util/ipvs"
)
const (
rsGracefulDeletePeriod = 15 * time.Minute
rsCheckDeleteInterval = 1 * time.Minute
)
// listItem stores real server information and the process time.
// If nothing special happened, real server will be delete after process time.
type listItem struct {
VirtualServer *utilipvs.VirtualServer
RealServer *utilipvs.RealServer
ProcessAt time.Time
}
// String return the unique real server name(with virtual server information)
func (g *listItem) String() string {
return GetUniqueRSName(g.VirtualServer, g.RealServer)
}
// GetUniqueRSName return a string type unique rs name with vs information
func GetUniqueRSName(vs *utilipvs.VirtualServer, rs *utilipvs.RealServer) string {
return vs.String() + "/" + rs.String()
}
type graceTerminateRSList struct {
lock sync.Mutex
list *list.List
set sets.String
}
// add push an new element to the rsList
func (q *graceTerminateRSList) add(rs *listItem) bool {
q.lock.Lock()
defer q.lock.Unlock()
uniqueRS := rs.String()
if q.set.Has(uniqueRS) {
return false
}
glog.V(5).Infof("Pushing rs %v to graceful delete rsList: %+v", rs)
q.list.PushBack(rs)
q.set.Insert(uniqueRS)
return true
}
// remove remove an element from the rsList
func (q *graceTerminateRSList) remove(rs *listItem) bool {
q.lock.Lock()
defer q.lock.Unlock()
uniqueRS := rs.String()
if !q.set.Has(uniqueRS) {
return false
}
q.set.Delete(uniqueRS)
for e := q.list.Front(); e.Next() == nil; e = e.Next() {
val := e.Value.(*listItem)
if val.String() == uniqueRS {
q.list.Remove(e)
return true
}
}
return false
}
// head return the first element from the rsList
func (q *graceTerminateRSList) head() (*listItem, bool) {
q.lock.Lock()
defer q.lock.Unlock()
if q.list.Len() == 0 {
return nil, false
}
result := q.list.Front().Value.(*listItem)
return result, true
}
// exist check whether the specified unique RS is in the rsList
func (q *graceTerminateRSList) exist(uniqueRS string) (*listItem, bool) {
q.lock.Lock()
defer q.lock.Unlock()
if !q.set.Has(uniqueRS) {
return nil, false
}
for e := q.list.Front(); e.Next() == nil; e = e.Next() {
val := e.Value.(*listItem)
if val.String() == uniqueRS {
return val, true
}
}
return nil, false
}
// GracefulTerminationManager manage rs graceful termination information and do graceful termination work
// rsList is the rs list to graceful termination, ipvs is the ipvsinterface to do ipvs delete/update work
type GracefulTerminationManager struct {
rsList graceTerminateRSList
ipvs utilipvs.Interface
}
// NewGracefulTerminationManager create a gracefulTerminationManager to manage ipvs rs graceful termination work
func NewGracefulTerminationManager(ipvs utilipvs.Interface) *GracefulTerminationManager {
l := list.New()
l.Init()
return &GracefulTerminationManager{
rsList: graceTerminateRSList{
list: l,
set: sets.NewString(),
},
ipvs: ipvs,
}
}
// InTerminationList to check whether specified unique rs name is in graceful termination list
func (m *GracefulTerminationManager) InTerminationList(uniqueRS string) bool {
_, exist := m.rsList.exist(uniqueRS)
return exist
}
// GracefulDeleteRS to update rs weight to 0, and add rs to graceful terminate list
func (m *GracefulTerminationManager) GracefulDeleteRS(vs *utilipvs.VirtualServer, rs *utilipvs.RealServer) error {
rs.Weight = 0
err := m.ipvs.UpdateRealServer(vs, rs)
if err != nil {
return err
}
ele := &listItem{
VirtualServer: vs,
RealServer: rs,
ProcessAt: time.Now().Add(rsGracefulDeletePeriod),
}
glog.V(5).Infof("Adding an element to graceful delete rsList: %+v", ele)
m.rsList.add(ele)
return nil
}
func (m *GracefulTerminationManager) tryDeleteRs() {
for {
rsToDelete, _ := m.rsList.head()
glog.V(5).Infof("Trying to delete rs")
if rsToDelete == nil || rsToDelete.ProcessAt.After(time.Now()) {
break
}
glog.V(5).Infof("Deleting rs: %s", rsToDelete.String())
err := m.ipvs.DeleteRealServer(rsToDelete.VirtualServer, rsToDelete.RealServer)
if err != nil {
glog.Errorf("Failed to delete destination: %v, error: %v", rsToDelete.RealServer, err)
}
if !m.rsList.remove(rsToDelete) {
glog.Errorf("Failed to pop out rsList.")
}
}
}
// MoveRSOutofGracefulDeleteList to delete an rs and remove it from the rsList immediately
func (m *GracefulTerminationManager) MoveRSOutofGracefulDeleteList(uniqueRS string) error {
rsToDelete, find := m.rsList.exist(uniqueRS)
if !find || rsToDelete == nil {
return fmt.Errorf("failed to find rs: %q", uniqueRS)
}
err := m.ipvs.DeleteRealServer(rsToDelete.VirtualServer, rsToDelete.RealServer)
if err != nil {
return err
}
m.rsList.remove(rsToDelete)
return nil
}
// Run start a goroutine to try to delete rs in the graceful delete rsList with an interval 1 minute
func (m *GracefulTerminationManager) Run() {
// before start, add leftover in delete rs to graceful delete rsList
vss, err := m.ipvs.GetVirtualServers()
if err != nil {
glog.Errorf("IPVS graceful delete manager failed to get IPVS virtualserver")
}
for _, vs := range vss {
rss, err := m.ipvs.GetRealServers(vs)
if err != nil {
glog.Errorf("IPVS graceful delete manager failed to get %v realserver", vs)
continue
}
for _, rs := range rss {
if rs.Weight == 0 {
ele := &listItem{
VirtualServer: vs,
RealServer: rs,
ProcessAt: time.Now().Add(rsGracefulDeletePeriod),
}
m.rsList.add(ele)
}
}
}
go wait.Until(m.tryDeleteRs, rsCheckDeleteInterval, wait.NeverStop)
}

View File

@ -231,7 +231,8 @@ type Proxier struct {
nodePortAddresses []string
// networkInterfacer defines an interface for several net library functions.
// Inject for test purpose.
networkInterfacer utilproxy.NetworkInterfacer
networkInterfacer utilproxy.NetworkInterfacer
gracefuldeleteManager *GracefulTerminationManager
}
// IPGetter helps get node network interface IP
@ -353,38 +354,39 @@ func NewProxier(ipt utiliptables.Interface,
healthChecker := healthcheck.NewServer(hostname, recorder, nil, nil) // use default implementations of deps
proxier := &Proxier{
portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable),
serviceMap: make(proxy.ServiceMap),
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, &isIPv6, recorder),
endpointsMap: make(proxy.EndpointsMap),
endpointsChanges: proxy.NewEndpointChangeTracker(hostname, nil, &isIPv6, recorder),
syncPeriod: syncPeriod,
minSyncPeriod: minSyncPeriod,
excludeCIDRs: excludeCIDRs,
iptables: ipt,
masqueradeAll: masqueradeAll,
masqueradeMark: masqueradeMark,
exec: exec,
clusterCIDR: clusterCIDR,
hostname: hostname,
nodeIP: nodeIP,
portMapper: &listenPortOpener{},
recorder: recorder,
healthChecker: healthChecker,
healthzServer: healthzServer,
ipvs: ipvs,
ipvsScheduler: scheduler,
ipGetter: &realIPGetter{nl: NewNetLinkHandle()},
iptablesData: bytes.NewBuffer(nil),
filterChainsData: bytes.NewBuffer(nil),
natChains: bytes.NewBuffer(nil),
natRules: bytes.NewBuffer(nil),
filterChains: bytes.NewBuffer(nil),
filterRules: bytes.NewBuffer(nil),
netlinkHandle: NewNetLinkHandle(),
ipset: ipset,
nodePortAddresses: nodePortAddresses,
networkInterfacer: utilproxy.RealNetwork{},
portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable),
serviceMap: make(proxy.ServiceMap),
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, &isIPv6, recorder),
endpointsMap: make(proxy.EndpointsMap),
endpointsChanges: proxy.NewEndpointChangeTracker(hostname, nil, &isIPv6, recorder),
syncPeriod: syncPeriod,
minSyncPeriod: minSyncPeriod,
excludeCIDRs: excludeCIDRs,
iptables: ipt,
masqueradeAll: masqueradeAll,
masqueradeMark: masqueradeMark,
exec: exec,
clusterCIDR: clusterCIDR,
hostname: hostname,
nodeIP: nodeIP,
portMapper: &listenPortOpener{},
recorder: recorder,
healthChecker: healthChecker,
healthzServer: healthzServer,
ipvs: ipvs,
ipvsScheduler: scheduler,
ipGetter: &realIPGetter{nl: NewNetLinkHandle()},
iptablesData: bytes.NewBuffer(nil),
filterChainsData: bytes.NewBuffer(nil),
natChains: bytes.NewBuffer(nil),
natRules: bytes.NewBuffer(nil),
filterChains: bytes.NewBuffer(nil),
filterRules: bytes.NewBuffer(nil),
netlinkHandle: NewNetLinkHandle(),
ipset: ipset,
nodePortAddresses: nodePortAddresses,
networkInterfacer: utilproxy.RealNetwork{},
gracefuldeleteManager: NewGracefulTerminationManager(ipvs),
}
// initialize ipsetList with all sets we needed
proxier.ipsetList = make(map[string]*IPSet)
@ -397,6 +399,7 @@ func NewProxier(ipt utiliptables.Interface,
burstSyncs := 2
glog.V(3).Infof("minSyncPeriod: %v, syncPeriod: %v, burstSyncs: %d", minSyncPeriod, syncPeriod, burstSyncs)
proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs)
proxier.gracefuldeleteManager.Run()
return proxier, nil
}
@ -1510,53 +1513,72 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode
newEndpoints.Insert(epInfo.String())
}
if !curEndpoints.Equal(newEndpoints) {
// Create new endpoints
for _, ep := range newEndpoints.Difference(curEndpoints).UnsortedList() {
ip, port, err := net.SplitHostPort(ep)
if err != nil {
glog.Errorf("Failed to parse endpoint: %v, error: %v", ep, err)
continue
}
portNum, err := strconv.Atoi(port)
if err != nil {
glog.Errorf("Failed to parse endpoint port %s, error: %v", port, err)
continue
}
// Create new endpoints
for _, ep := range newEndpoints.List() {
ip, port, err := net.SplitHostPort(ep)
if err != nil {
glog.Errorf("Failed to parse endpoint: %v, error: %v", ep, err)
continue
}
portNum, err := strconv.Atoi(port)
if err != nil {
glog.Errorf("Failed to parse endpoint port %s, error: %v", port, err)
continue
}
newDest := &utilipvs.RealServer{
Address: net.ParseIP(ip),
Port: uint16(portNum),
Weight: 1,
newDest := &utilipvs.RealServer{
Address: net.ParseIP(ip),
Port: uint16(portNum),
Weight: 1,
}
if curEndpoints.Has(ep) {
// check if newEndpoint is in gracefulDelete list, is true, delete this ep immediately
uniqueRS := GetUniqueRSName(vs, newDest)
if !proxier.gracefuldeleteManager.InTerminationList(uniqueRS) {
continue
}
err = proxier.ipvs.AddRealServer(appliedVirtualServer, newDest)
glog.V(5).Infof("new ep %q is in graceful delete list", uniqueRS)
err := proxier.gracefuldeleteManager.MoveRSOutofGracefulDeleteList(uniqueRS)
if err != nil {
glog.Errorf("Failed to add destination: %v, error: %v", newDest, err)
glog.Errorf("Failed to delete endpoint: %v in gracefulDeleteQueue, error: %v", ep, err)
continue
}
}
// Delete old endpoints
for _, ep := range curEndpoints.Difference(newEndpoints).UnsortedList() {
ip, port, err := net.SplitHostPort(ep)
if err != nil {
glog.Errorf("Failed to parse endpoint: %v, error: %v", ep, err)
continue
}
portNum, err := strconv.Atoi(port)
if err != nil {
glog.Errorf("Failed to parse endpoint port %s, error: %v", port, err)
continue
}
err = proxier.ipvs.AddRealServer(appliedVirtualServer, newDest)
if err != nil {
glog.Errorf("Failed to add destination: %v, error: %v", newDest, err)
continue
}
}
// Delete old endpoints
for _, ep := range curEndpoints.Difference(newEndpoints).UnsortedList() {
// if curEndpoint is in gracefulDelete, skip
uniqueRS := vs.String() + "/" + ep
if proxier.gracefuldeleteManager.InTerminationList(uniqueRS) {
continue
}
ip, port, err := net.SplitHostPort(ep)
if err != nil {
glog.Errorf("Failed to parse endpoint: %v, error: %v", ep, err)
continue
}
portNum, err := strconv.Atoi(port)
if err != nil {
glog.Errorf("Failed to parse endpoint port %s, error: %v", port, err)
continue
}
delDest := &utilipvs.RealServer{
Address: net.ParseIP(ip),
Port: uint16(portNum),
}
err = proxier.ipvs.DeleteRealServer(appliedVirtualServer, delDest)
if err != nil {
glog.Errorf("Failed to delete destination: %v, error: %v", delDest, err)
continue
}
delDest := &utilipvs.RealServer{
Address: net.ParseIP(ip),
Port: uint16(portNum),
}
glog.V(5).Infof("Using graceful delete to delete: %v", delDest)
err = proxier.gracefuldeleteManager.GracefulDeleteRS(appliedVirtualServer, delDest)
if err != nil {
glog.Errorf("Failed to delete destination: %v, error: %v", delDest, err)
continue
}
}
return nil
@ -1569,6 +1591,11 @@ func (proxier *Proxier) cleanLegacyService(activeServices map[string]bool, curre
// This service was not processed in the latest sync loop so before deleting it,
// make sure it does not fall within an excluded CIDR range.
okayToDelete := true
rsList, err := proxier.ipvs.GetRealServers(svc)
if len(rsList) != 0 && err == nil {
glog.V(5).Infof("Will not delete VS: %v, cause it have RS: %v", svc, rsList)
okayToDelete = false
}
for _, excludedCIDR := range proxier.excludeCIDRs {
// Any validation of this CIDR already should have occurred.
_, n, _ := net.ParseCIDR(excludedCIDR)

View File

@ -41,6 +41,8 @@ type Interface interface {
GetRealServers(*VirtualServer) ([]*RealServer, error)
// DeleteRealServer deletes the specified real server from the specified virtual server.
DeleteRealServer(*VirtualServer, *RealServer) error
// UpdateRealServer updates the specified real server from the specified virtual server.
UpdateRealServer(*VirtualServer, *RealServer) error
}
// VirtualServer is an user-oriented definition of an IPVS virtual server in its entirety.

View File

@ -144,6 +144,18 @@ func (runner *runner) DeleteRealServer(vs *VirtualServer, rs *RealServer) error
return runner.ipvsHandle.DelDestination(svc, dst)
}
func (runner *runner) UpdateRealServer(vs *VirtualServer, rs *RealServer) error {
svc, err := toIPVSService(vs)
if err != nil {
return err
}
dst, err := toIPVSDestination(rs)
if err != nil {
return err
}
return runner.ipvsHandle.UpdateDestination(svc, dst)
}
// GetRealServers is part of ipvs.Interface.
func (runner *runner) GetRealServers(vs *VirtualServer) ([]*RealServer, error) {
svc, err := toIPVSService(vs)

View File

@ -68,4 +68,8 @@ func (runner *runner) DeleteRealServer(*VirtualServer, *RealServer) error {
return fmt.Errorf("IPVS not supported for this platform")
}
func (runner *runner) UpdateRealServer(*VirtualServer, *RealServer) error {
return fmt.Errorf("IPVS not supported for this platform")
}
var _ = Interface(&runner{})

View File

@ -193,4 +193,13 @@ func (f *FakeIPVS) DeleteRealServer(serv *utilipvs.VirtualServer, dest *utilipvs
return nil
}
// UpdateRealServer is a fake implementation, it deletes the old real server then add new real server
func (f *FakeIPVS) UpdateRealServer(serv *utilipvs.VirtualServer, dest *utilipvs.RealServer) error {
err := f.DeleteRealServer(serv, dest)
if err != nil {
return err
}
return f.AddRealServer(serv, dest)
}
var _ = utilipvs.Interface(&FakeIPVS{})