diff --git a/pkg/proxy/ipvs/graceful_termination.go b/pkg/proxy/ipvs/graceful_termination.go index 8df14ec774..95973fa75d 100644 --- a/pkg/proxy/ipvs/graceful_termination.go +++ b/pkg/proxy/ipvs/graceful_termination.go @@ -17,13 +17,11 @@ limitations under the License. package ipvs import ( - "container/list" "sync" "time" "fmt" "github.com/golang/glog" - "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" utilipvs "k8s.io/kubernetes/pkg/util/ipvs" ) @@ -38,7 +36,6 @@ const ( type listItem struct { VirtualServer *utilipvs.VirtualServer RealServer *utilipvs.RealServer - ProcessAt time.Time } // String return the unique real server name(with virtual server information) @@ -53,8 +50,7 @@ func GetUniqueRSName(vs *utilipvs.VirtualServer, rs *utilipvs.RealServer) string type graceTerminateRSList struct { lock sync.Mutex - list *list.List - set sets.String + list map[string]*listItem } // add push an new element to the rsList @@ -63,13 +59,12 @@ func (q *graceTerminateRSList) add(rs *listItem) bool { defer q.lock.Unlock() uniqueRS := rs.String() - if q.set.Has(uniqueRS) { + if _, ok := q.list[uniqueRS]; ok { return false } - glog.V(5).Infof("Pushing rs %v to graceful delete rsList: %+v", rs) - q.list.PushBack(rs) - q.set.Insert(uniqueRS) + glog.V(5).Infof("Adding rs %v to graceful delete rsList", rs) + q.list[uniqueRS] = rs return true } @@ -79,29 +74,30 @@ func (q *graceTerminateRSList) remove(rs *listItem) bool { defer q.lock.Unlock() uniqueRS := rs.String() - if !q.set.Has(uniqueRS) { + if _, ok := q.list[uniqueRS]; ok { return false } - q.set.Delete(uniqueRS) - for e := q.list.Front(); e.Next() == nil; e = e.Next() { - val := e.Value.(*listItem) - if val.String() == uniqueRS { - q.list.Remove(e) - return true - } - } - return false + delete(q.list, uniqueRS) + return true } -// head return the first element from the rsList -func (q *graceTerminateRSList) head() (*listItem, bool) { +func (q *graceTerminateRSList) flushList(handler func(rsToDelete *listItem) (bool, error)) bool { q.lock.Lock() defer q.lock.Unlock() - if q.list.Len() == 0 { - return nil, false + + success := true + for name, rs := range q.list { + deleted, err := handler(rs) + if err != nil { + glog.Errorf("Try delete rs %q err: %v", name, err) + success = false + } + if deleted { + glog.Infof("lw: remote out of the list: %s", name) + q.remove(rs) + } } - result := q.list.Front().Value.(*listItem) - return result, true + return success } // exist check whether the specified unique RS is in the rsList @@ -109,15 +105,9 @@ func (q *graceTerminateRSList) exist(uniqueRS string) (*listItem, bool) { q.lock.Lock() defer q.lock.Unlock() - if !q.set.Has(uniqueRS) { + if _, ok := q.list[uniqueRS]; ok { return nil, false } - for e := q.list.Front(); e.Next() == nil; e = e.Next() { - val := e.Value.(*listItem) - if val.String() == uniqueRS { - return val, true - } - } return nil, false } @@ -130,12 +120,10 @@ type GracefulTerminationManager struct { // NewGracefulTerminationManager create a gracefulTerminationManager to manage ipvs rs graceful termination work func NewGracefulTerminationManager(ipvs utilipvs.Interface) *GracefulTerminationManager { - l := list.New() - l.Init() + l := make(map[string]*listItem) return &GracefulTerminationManager{ rsList: graceTerminateRSList{ list: l, - set: sets.NewString(), }, ipvs: ipvs, } @@ -149,38 +137,53 @@ func (m *GracefulTerminationManager) InTerminationList(uniqueRS string) bool { // GracefulDeleteRS to update rs weight to 0, and add rs to graceful terminate list func (m *GracefulTerminationManager) GracefulDeleteRS(vs *utilipvs.VirtualServer, rs *utilipvs.RealServer) error { - rs.Weight = 0 - err := m.ipvs.UpdateRealServer(vs, rs) - if err != nil { - return err - } - + // Try to delete rs before add it to graceful delete list ele := &listItem{ VirtualServer: vs, RealServer: rs, - ProcessAt: time.Now().Add(rsGracefulDeletePeriod), + } + deleted, err := m.deleteRsFunc(ele) + if err != nil { + glog.Errorf("Delete rs %q err: %v", err) + } + if deleted { + return nil + } + rs.Weight = 0 + err = m.ipvs.UpdateRealServer(vs, rs) + if err != nil { + return err } glog.V(5).Infof("Adding an element to graceful delete rsList: %+v", ele) m.rsList.add(ele) return nil } -func (m *GracefulTerminationManager) tryDeleteRs() { - for { - rsToDelete, _ := m.rsList.head() - glog.V(5).Infof("Trying to delete rs") - if rsToDelete == nil || rsToDelete.ProcessAt.After(time.Now()) { - break +func (m *GracefulTerminationManager) deleteRsFunc(rsToDelete *listItem) (bool, error) { + glog.Infof("Trying to delete rs: %s", rsToDelete.String()) + rss, err := m.ipvs.GetRealServers(rsToDelete.VirtualServer) + if err != nil { + return false, err + } + for _, rs := range rss { + if rsToDelete.RealServer.Equal(rs) { + if rs.ActiveConn != 0 { + return false, nil + } + glog.Infof("Deleting rs: %s", rsToDelete.String()) + err := m.ipvs.DeleteRealServer(rsToDelete.VirtualServer, rs) + if err != nil { + return false, fmt.Errorf("Delete destination %q err: %v", rs.String(), err) + } + return true, nil } + } + return false, fmt.Errorf("Failed to delete rs %q, can't find the real server", rsToDelete.String()) +} - glog.V(5).Infof("Deleting rs: %s", rsToDelete.String()) - err := m.ipvs.DeleteRealServer(rsToDelete.VirtualServer, rsToDelete.RealServer) - if err != nil { - glog.Errorf("Failed to delete destination: %v, error: %v", rsToDelete.RealServer, err) - } - if !m.rsList.remove(rsToDelete) { - glog.Errorf("Failed to pop out rsList.") - } +func (m *GracefulTerminationManager) tryDeleteRs() { + if !m.rsList.flushList(m.deleteRsFunc) { + glog.Errorf("Try flush graceful termination list err") } } @@ -212,14 +215,7 @@ func (m *GracefulTerminationManager) Run() { continue } for _, rs := range rss { - if rs.Weight == 0 { - ele := &listItem{ - VirtualServer: vs, - RealServer: rs, - ProcessAt: time.Now().Add(rsGracefulDeletePeriod), - } - m.rsList.add(ele) - } + m.GracefulDeleteRS(vs, rs) } } diff --git a/pkg/util/ipvs/ipvs.go b/pkg/util/ipvs/ipvs.go index d6910abaef..ca1b55696b 100644 --- a/pkg/util/ipvs/ipvs.go +++ b/pkg/util/ipvs/ipvs.go @@ -93,9 +93,11 @@ func (svc *VirtualServer) String() string { // RealServer is an user-oriented definition of an IPVS real server in its entirety. type RealServer struct { - Address net.IP - Port uint16 - Weight int + Address net.IP + Port uint16 + Weight int + ActiveConn int + InactiveConn int } func (rs *RealServer) String() string { diff --git a/pkg/util/ipvs/ipvs_linux.go b/pkg/util/ipvs/ipvs_linux.go index a6d2e1862d..b7fcca9053 100644 --- a/pkg/util/ipvs/ipvs_linux.go +++ b/pkg/util/ipvs/ipvs_linux.go @@ -215,9 +215,11 @@ func toRealServer(dst *libipvs.Destination) (*RealServer, error) { return nil, errors.New("ipvs destination should not be empty") } return &RealServer{ - Address: dst.Address, - Port: dst.Port, - Weight: dst.Weight, + Address: dst.Address, + Port: dst.Port, + Weight: dst.Weight, + ActiveConn: dst.ActiveConnections, + InactiveConn: dst.InactiveConnections, }, nil } diff --git a/pkg/util/ipvs/testing/fake_test.go b/pkg/util/ipvs/testing/fake_test.go index c07a2617f6..8bed6fc782 100644 --- a/pkg/util/ipvs/testing/fake_test.go +++ b/pkg/util/ipvs/testing/fake_test.go @@ -124,9 +124,9 @@ func TestRealServer(t *testing.T) { Protocol: string("TCP"), } rss := []*utilipvs.RealServer{ - {net.ParseIP("172.16.2.1"), 8080, 1}, - {net.ParseIP("172.16.2.2"), 8080, 2}, - {net.ParseIP("172.16.2.3"), 8080, 3}, + {Address: net.ParseIP("172.16.2.1"), Port: 8080, Weight: 1}, + {Address: net.ParseIP("172.16.2.2"), Port: 8080, Weight: 2}, + {Address: net.ParseIP("172.16.2.3"), Port: 8080, Weight: 3}, } err := fake.AddVirtualServer(vs) if err != nil {