mirror of https://github.com/k3s-io/k3s
ipvs connection based graceful termination
parent
80ff8b359c
commit
9e4f84f42e
|
@ -17,13 +17,11 @@ limitations under the License.
|
||||||
package ipvs
|
package ipvs
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"container/list"
|
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
"k8s.io/apimachinery/pkg/util/sets"
|
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
utilipvs "k8s.io/kubernetes/pkg/util/ipvs"
|
utilipvs "k8s.io/kubernetes/pkg/util/ipvs"
|
||||||
)
|
)
|
||||||
|
@ -38,7 +36,6 @@ const (
|
||||||
type listItem struct {
|
type listItem struct {
|
||||||
VirtualServer *utilipvs.VirtualServer
|
VirtualServer *utilipvs.VirtualServer
|
||||||
RealServer *utilipvs.RealServer
|
RealServer *utilipvs.RealServer
|
||||||
ProcessAt time.Time
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// String return the unique real server name(with virtual server information)
|
// String return the unique real server name(with virtual server information)
|
||||||
|
@ -53,8 +50,7 @@ func GetUniqueRSName(vs *utilipvs.VirtualServer, rs *utilipvs.RealServer) string
|
||||||
|
|
||||||
type graceTerminateRSList struct {
|
type graceTerminateRSList struct {
|
||||||
lock sync.Mutex
|
lock sync.Mutex
|
||||||
list *list.List
|
list map[string]*listItem
|
||||||
set sets.String
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// add push an new element to the rsList
|
// add push an new element to the rsList
|
||||||
|
@ -63,13 +59,12 @@ func (q *graceTerminateRSList) add(rs *listItem) bool {
|
||||||
defer q.lock.Unlock()
|
defer q.lock.Unlock()
|
||||||
|
|
||||||
uniqueRS := rs.String()
|
uniqueRS := rs.String()
|
||||||
if q.set.Has(uniqueRS) {
|
if _, ok := q.list[uniqueRS]; ok {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
glog.V(5).Infof("Pushing rs %v to graceful delete rsList: %+v", rs)
|
|
||||||
|
|
||||||
q.list.PushBack(rs)
|
glog.V(5).Infof("Adding rs %v to graceful delete rsList", rs)
|
||||||
q.set.Insert(uniqueRS)
|
q.list[uniqueRS] = rs
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -79,29 +74,30 @@ func (q *graceTerminateRSList) remove(rs *listItem) bool {
|
||||||
defer q.lock.Unlock()
|
defer q.lock.Unlock()
|
||||||
|
|
||||||
uniqueRS := rs.String()
|
uniqueRS := rs.String()
|
||||||
if !q.set.Has(uniqueRS) {
|
if _, ok := q.list[uniqueRS]; ok {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
q.set.Delete(uniqueRS)
|
delete(q.list, uniqueRS)
|
||||||
for e := q.list.Front(); e.Next() == nil; e = e.Next() {
|
|
||||||
val := e.Value.(*listItem)
|
|
||||||
if val.String() == uniqueRS {
|
|
||||||
q.list.Remove(e)
|
|
||||||
return true
|
return true
|
||||||
}
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// head return the first element from the rsList
|
func (q *graceTerminateRSList) flushList(handler func(rsToDelete *listItem) (bool, error)) bool {
|
||||||
func (q *graceTerminateRSList) head() (*listItem, bool) {
|
|
||||||
q.lock.Lock()
|
q.lock.Lock()
|
||||||
defer q.lock.Unlock()
|
defer q.lock.Unlock()
|
||||||
if q.list.Len() == 0 {
|
|
||||||
return nil, false
|
success := true
|
||||||
|
for name, rs := range q.list {
|
||||||
|
deleted, err := handler(rs)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Try delete rs %q err: %v", name, err)
|
||||||
|
success = false
|
||||||
}
|
}
|
||||||
result := q.list.Front().Value.(*listItem)
|
if deleted {
|
||||||
return result, true
|
glog.Infof("lw: remote out of the list: %s", name)
|
||||||
|
q.remove(rs)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return success
|
||||||
}
|
}
|
||||||
|
|
||||||
// exist check whether the specified unique RS is in the rsList
|
// exist check whether the specified unique RS is in the rsList
|
||||||
|
@ -109,15 +105,9 @@ func (q *graceTerminateRSList) exist(uniqueRS string) (*listItem, bool) {
|
||||||
q.lock.Lock()
|
q.lock.Lock()
|
||||||
defer q.lock.Unlock()
|
defer q.lock.Unlock()
|
||||||
|
|
||||||
if !q.set.Has(uniqueRS) {
|
if _, ok := q.list[uniqueRS]; ok {
|
||||||
return nil, false
|
return nil, false
|
||||||
}
|
}
|
||||||
for e := q.list.Front(); e.Next() == nil; e = e.Next() {
|
|
||||||
val := e.Value.(*listItem)
|
|
||||||
if val.String() == uniqueRS {
|
|
||||||
return val, true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil, false
|
return nil, false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -130,12 +120,10 @@ type GracefulTerminationManager struct {
|
||||||
|
|
||||||
// NewGracefulTerminationManager create a gracefulTerminationManager to manage ipvs rs graceful termination work
|
// NewGracefulTerminationManager create a gracefulTerminationManager to manage ipvs rs graceful termination work
|
||||||
func NewGracefulTerminationManager(ipvs utilipvs.Interface) *GracefulTerminationManager {
|
func NewGracefulTerminationManager(ipvs utilipvs.Interface) *GracefulTerminationManager {
|
||||||
l := list.New()
|
l := make(map[string]*listItem)
|
||||||
l.Init()
|
|
||||||
return &GracefulTerminationManager{
|
return &GracefulTerminationManager{
|
||||||
rsList: graceTerminateRSList{
|
rsList: graceTerminateRSList{
|
||||||
list: l,
|
list: l,
|
||||||
set: sets.NewString(),
|
|
||||||
},
|
},
|
||||||
ipvs: ipvs,
|
ipvs: ipvs,
|
||||||
}
|
}
|
||||||
|
@ -149,38 +137,53 @@ func (m *GracefulTerminationManager) InTerminationList(uniqueRS string) bool {
|
||||||
|
|
||||||
// GracefulDeleteRS to update rs weight to 0, and add rs to graceful terminate list
|
// GracefulDeleteRS to update rs weight to 0, and add rs to graceful terminate list
|
||||||
func (m *GracefulTerminationManager) GracefulDeleteRS(vs *utilipvs.VirtualServer, rs *utilipvs.RealServer) error {
|
func (m *GracefulTerminationManager) GracefulDeleteRS(vs *utilipvs.VirtualServer, rs *utilipvs.RealServer) error {
|
||||||
rs.Weight = 0
|
// Try to delete rs before add it to graceful delete list
|
||||||
err := m.ipvs.UpdateRealServer(vs, rs)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
ele := &listItem{
|
ele := &listItem{
|
||||||
VirtualServer: vs,
|
VirtualServer: vs,
|
||||||
RealServer: rs,
|
RealServer: rs,
|
||||||
ProcessAt: time.Now().Add(rsGracefulDeletePeriod),
|
}
|
||||||
|
deleted, err := m.deleteRsFunc(ele)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Delete rs %q err: %v", err)
|
||||||
|
}
|
||||||
|
if deleted {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
rs.Weight = 0
|
||||||
|
err = m.ipvs.UpdateRealServer(vs, rs)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
glog.V(5).Infof("Adding an element to graceful delete rsList: %+v", ele)
|
glog.V(5).Infof("Adding an element to graceful delete rsList: %+v", ele)
|
||||||
m.rsList.add(ele)
|
m.rsList.add(ele)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *GracefulTerminationManager) tryDeleteRs() {
|
func (m *GracefulTerminationManager) deleteRsFunc(rsToDelete *listItem) (bool, error) {
|
||||||
for {
|
glog.Infof("Trying to delete rs: %s", rsToDelete.String())
|
||||||
rsToDelete, _ := m.rsList.head()
|
rss, err := m.ipvs.GetRealServers(rsToDelete.VirtualServer)
|
||||||
glog.V(5).Infof("Trying to delete rs")
|
|
||||||
if rsToDelete == nil || rsToDelete.ProcessAt.After(time.Now()) {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
glog.V(5).Infof("Deleting rs: %s", rsToDelete.String())
|
|
||||||
err := m.ipvs.DeleteRealServer(rsToDelete.VirtualServer, rsToDelete.RealServer)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Failed to delete destination: %v, error: %v", rsToDelete.RealServer, err)
|
return false, err
|
||||||
}
|
}
|
||||||
if !m.rsList.remove(rsToDelete) {
|
for _, rs := range rss {
|
||||||
glog.Errorf("Failed to pop out rsList.")
|
if rsToDelete.RealServer.Equal(rs) {
|
||||||
|
if rs.ActiveConn != 0 {
|
||||||
|
return false, nil
|
||||||
}
|
}
|
||||||
|
glog.Infof("Deleting rs: %s", rsToDelete.String())
|
||||||
|
err := m.ipvs.DeleteRealServer(rsToDelete.VirtualServer, rs)
|
||||||
|
if err != nil {
|
||||||
|
return false, fmt.Errorf("Delete destination %q err: %v", rs.String(), err)
|
||||||
|
}
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false, fmt.Errorf("Failed to delete rs %q, can't find the real server", rsToDelete.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *GracefulTerminationManager) tryDeleteRs() {
|
||||||
|
if !m.rsList.flushList(m.deleteRsFunc) {
|
||||||
|
glog.Errorf("Try flush graceful termination list err")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -212,14 +215,7 @@ func (m *GracefulTerminationManager) Run() {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
for _, rs := range rss {
|
for _, rs := range rss {
|
||||||
if rs.Weight == 0 {
|
m.GracefulDeleteRS(vs, rs)
|
||||||
ele := &listItem{
|
|
||||||
VirtualServer: vs,
|
|
||||||
RealServer: rs,
|
|
||||||
ProcessAt: time.Now().Add(rsGracefulDeletePeriod),
|
|
||||||
}
|
|
||||||
m.rsList.add(ele)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -96,6 +96,8 @@ type RealServer struct {
|
||||||
Address net.IP
|
Address net.IP
|
||||||
Port uint16
|
Port uint16
|
||||||
Weight int
|
Weight int
|
||||||
|
ActiveConn int
|
||||||
|
InactiveConn int
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rs *RealServer) String() string {
|
func (rs *RealServer) String() string {
|
||||||
|
|
|
@ -218,6 +218,8 @@ func toRealServer(dst *libipvs.Destination) (*RealServer, error) {
|
||||||
Address: dst.Address,
|
Address: dst.Address,
|
||||||
Port: dst.Port,
|
Port: dst.Port,
|
||||||
Weight: dst.Weight,
|
Weight: dst.Weight,
|
||||||
|
ActiveConn: dst.ActiveConnections,
|
||||||
|
InactiveConn: dst.InactiveConnections,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -124,9 +124,9 @@ func TestRealServer(t *testing.T) {
|
||||||
Protocol: string("TCP"),
|
Protocol: string("TCP"),
|
||||||
}
|
}
|
||||||
rss := []*utilipvs.RealServer{
|
rss := []*utilipvs.RealServer{
|
||||||
{net.ParseIP("172.16.2.1"), 8080, 1},
|
{Address: net.ParseIP("172.16.2.1"), Port: 8080, Weight: 1},
|
||||||
{net.ParseIP("172.16.2.2"), 8080, 2},
|
{Address: net.ParseIP("172.16.2.2"), Port: 8080, Weight: 2},
|
||||||
{net.ParseIP("172.16.2.3"), 8080, 3},
|
{Address: net.ParseIP("172.16.2.3"), Port: 8080, Weight: 3},
|
||||||
}
|
}
|
||||||
err := fake.AddVirtualServer(vs)
|
err := fake.AddVirtualServer(vs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
Loading…
Reference in New Issue