diff --git a/pkg/util/ipvs/ipvs_linux.go b/pkg/util/ipvs/ipvs_linux.go index b4f53be6fd..54dac29426 100644 --- a/pkg/util/ipvs/ipvs_linux.go +++ b/pkg/util/ipvs/ipvs_linux.go @@ -23,6 +23,7 @@ import ( "fmt" "net" "strings" + "sync" "syscall" libipvs "github.com/docker/libnetwork/ipvs" @@ -34,6 +35,7 @@ import ( type runner struct { exec utilexec.Interface ipvsHandle *libipvs.Handle + mu sync.Mutex // Protect Netlink calls } // Protocol is the IPVS service protocol type @@ -58,6 +60,8 @@ func (runner *runner) AddVirtualServer(vs *VirtualServer) error { if err != nil { return err } + runner.mu.Lock() + defer runner.mu.Unlock() return runner.ipvsHandle.NewService(svc) } @@ -67,6 +71,8 @@ func (runner *runner) UpdateVirtualServer(vs *VirtualServer) error { if err != nil { return err } + runner.mu.Lock() + defer runner.mu.Unlock() return runner.ipvsHandle.UpdateService(svc) } @@ -76,6 +82,8 @@ func (runner *runner) DeleteVirtualServer(vs *VirtualServer) error { if err != nil { return err } + runner.mu.Lock() + defer runner.mu.Unlock() return runner.ipvsHandle.DelService(svc) } @@ -85,7 +93,10 @@ func (runner *runner) GetVirtualServer(vs *VirtualServer) (*VirtualServer, error if err != nil { return nil, err } + runner.mu.Lock() ipvsSvc, err := runner.ipvsHandle.GetService(svc) + runner.mu.Unlock() + if err != nil { return nil, err } @@ -98,7 +109,9 @@ func (runner *runner) GetVirtualServer(vs *VirtualServer) (*VirtualServer, error // GetVirtualServers is part of ipvs.Interface. func (runner *runner) GetVirtualServers() ([]*VirtualServer, error) { + runner.mu.Lock() ipvsSvcs, err := runner.ipvsHandle.GetServices() + runner.mu.Unlock() if err != nil { return nil, err } @@ -115,6 +128,8 @@ func (runner *runner) GetVirtualServers() ([]*VirtualServer, error) { // Flush is part of ipvs.Interface. Currently we delete IPVS services one by one func (runner *runner) Flush() error { + runner.mu.Lock() + defer runner.mu.Unlock() return runner.ipvsHandle.Flush() } @@ -128,6 +143,8 @@ func (runner *runner) AddRealServer(vs *VirtualServer, rs *RealServer) error { if err != nil { return err } + runner.mu.Lock() + defer runner.mu.Unlock() return runner.ipvsHandle.NewDestination(svc, dst) } @@ -141,6 +158,8 @@ func (runner *runner) DeleteRealServer(vs *VirtualServer, rs *RealServer) error if err != nil { return err } + runner.mu.Lock() + defer runner.mu.Unlock() return runner.ipvsHandle.DelDestination(svc, dst) } @@ -153,6 +172,8 @@ func (runner *runner) UpdateRealServer(vs *VirtualServer, rs *RealServer) error if err != nil { return err } + runner.mu.Lock() + defer runner.mu.Unlock() return runner.ipvsHandle.UpdateDestination(svc, dst) } @@ -162,7 +183,9 @@ func (runner *runner) GetRealServers(vs *VirtualServer) ([]*RealServer, error) { if err != nil { return nil, err } + runner.mu.Lock() dsts, err := runner.ipvsHandle.GetDestinations(svc) + runner.mu.Unlock() if err != nil { return nil, err }