mirror of https://github.com/k3s-io/k3s
[kube-proxy/ipvs] Protect Netlink calls with a mutex
parent
aee1ab34ab
commit
1412d53b62
|
@ -23,6 +23,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"syscall"
|
"syscall"
|
||||||
|
|
||||||
libipvs "github.com/docker/libnetwork/ipvs"
|
libipvs "github.com/docker/libnetwork/ipvs"
|
||||||
|
@ -34,6 +35,7 @@ import (
|
||||||
type runner struct {
|
type runner struct {
|
||||||
exec utilexec.Interface
|
exec utilexec.Interface
|
||||||
ipvsHandle *libipvs.Handle
|
ipvsHandle *libipvs.Handle
|
||||||
|
mu sync.Mutex // Protect Netlink calls
|
||||||
}
|
}
|
||||||
|
|
||||||
// Protocol is the IPVS service protocol type
|
// Protocol is the IPVS service protocol type
|
||||||
|
@ -58,6 +60,8 @@ func (runner *runner) AddVirtualServer(vs *VirtualServer) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
runner.mu.Lock()
|
||||||
|
defer runner.mu.Unlock()
|
||||||
return runner.ipvsHandle.NewService(svc)
|
return runner.ipvsHandle.NewService(svc)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -67,6 +71,8 @@ func (runner *runner) UpdateVirtualServer(vs *VirtualServer) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
runner.mu.Lock()
|
||||||
|
defer runner.mu.Unlock()
|
||||||
return runner.ipvsHandle.UpdateService(svc)
|
return runner.ipvsHandle.UpdateService(svc)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -76,6 +82,8 @@ func (runner *runner) DeleteVirtualServer(vs *VirtualServer) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
runner.mu.Lock()
|
||||||
|
defer runner.mu.Unlock()
|
||||||
return runner.ipvsHandle.DelService(svc)
|
return runner.ipvsHandle.DelService(svc)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -85,7 +93,10 @@ func (runner *runner) GetVirtualServer(vs *VirtualServer) (*VirtualServer, error
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
runner.mu.Lock()
|
||||||
ipvsSvc, err := runner.ipvsHandle.GetService(svc)
|
ipvsSvc, err := runner.ipvsHandle.GetService(svc)
|
||||||
|
runner.mu.Unlock()
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -98,7 +109,9 @@ func (runner *runner) GetVirtualServer(vs *VirtualServer) (*VirtualServer, error
|
||||||
|
|
||||||
// GetVirtualServers is part of ipvs.Interface.
|
// GetVirtualServers is part of ipvs.Interface.
|
||||||
func (runner *runner) GetVirtualServers() ([]*VirtualServer, error) {
|
func (runner *runner) GetVirtualServers() ([]*VirtualServer, error) {
|
||||||
|
runner.mu.Lock()
|
||||||
ipvsSvcs, err := runner.ipvsHandle.GetServices()
|
ipvsSvcs, err := runner.ipvsHandle.GetServices()
|
||||||
|
runner.mu.Unlock()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -115,6 +128,8 @@ func (runner *runner) GetVirtualServers() ([]*VirtualServer, error) {
|
||||||
|
|
||||||
// Flush is part of ipvs.Interface. Currently we delete IPVS services one by one
|
// Flush is part of ipvs.Interface. Currently we delete IPVS services one by one
|
||||||
func (runner *runner) Flush() error {
|
func (runner *runner) Flush() error {
|
||||||
|
runner.mu.Lock()
|
||||||
|
defer runner.mu.Unlock()
|
||||||
return runner.ipvsHandle.Flush()
|
return runner.ipvsHandle.Flush()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -128,6 +143,8 @@ func (runner *runner) AddRealServer(vs *VirtualServer, rs *RealServer) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
runner.mu.Lock()
|
||||||
|
defer runner.mu.Unlock()
|
||||||
return runner.ipvsHandle.NewDestination(svc, dst)
|
return runner.ipvsHandle.NewDestination(svc, dst)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -141,6 +158,8 @@ func (runner *runner) DeleteRealServer(vs *VirtualServer, rs *RealServer) error
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
runner.mu.Lock()
|
||||||
|
defer runner.mu.Unlock()
|
||||||
return runner.ipvsHandle.DelDestination(svc, dst)
|
return runner.ipvsHandle.DelDestination(svc, dst)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -153,6 +172,8 @@ func (runner *runner) UpdateRealServer(vs *VirtualServer, rs *RealServer) error
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
runner.mu.Lock()
|
||||||
|
defer runner.mu.Unlock()
|
||||||
return runner.ipvsHandle.UpdateDestination(svc, dst)
|
return runner.ipvsHandle.UpdateDestination(svc, dst)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -162,7 +183,9 @@ func (runner *runner) GetRealServers(vs *VirtualServer) ([]*RealServer, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
runner.mu.Lock()
|
||||||
dsts, err := runner.ipvsHandle.GetDestinations(svc)
|
dsts, err := runner.ipvsHandle.GetDestinations(svc)
|
||||||
|
runner.mu.Unlock()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue