proxy: consolidate ServicesHandler/EndpointsHandler into ProxyProvider

Proxies should be able to cleanly figure out when endpoints have been synced,
so make all ProxyProviders also implement EndpointsHandler and pass those
through to loadbalancers when required.
k3s-v1.15.3
Dan Williams 2018-12-04 23:35:34 -06:00
parent 92a2076149
commit 4b07f80d20
13 changed files with 80 additions and 81 deletions

View File

@ -501,8 +501,6 @@ type ProxyServer struct {
OOMScoreAdj *int32
ResourceContainer string
ConfigSyncPeriod time.Duration
ServiceEventHandler config.ServiceHandler
EndpointsEventHandler config.EndpointsHandler
HealthzServer *healthcheck.HealthzServer
}
@ -657,11 +655,11 @@ func (s *ProxyServer) Run() error {
// only notify on changes, and the initial update (on process start) may be lost if no handlers
// are registered yet.
serviceConfig := config.NewServiceConfig(informerFactory.Core().V1().Services(), s.ConfigSyncPeriod)
serviceConfig.RegisterEventHandler(s.ServiceEventHandler)
serviceConfig.RegisterEventHandler(s.Proxier)
go serviceConfig.Run(wait.NeverStop)
endpointsConfig := config.NewEndpointsConfig(informerFactory.Core().V1().Endpoints(), s.ConfigSyncPeriod)
endpointsConfig.RegisterEventHandler(s.EndpointsEventHandler)
endpointsConfig.RegisterEventHandler(s.Proxier)
go endpointsConfig.Run(wait.NeverStop)
// This has to start after the calls to NewServiceConfig and NewEndpointsConfig because those

View File

@ -33,7 +33,6 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/proxy"
proxyconfigapi "k8s.io/kubernetes/pkg/proxy/apis/config"
proxyconfig "k8s.io/kubernetes/pkg/proxy/config"
"k8s.io/kubernetes/pkg/proxy/healthcheck"
"k8s.io/kubernetes/pkg/proxy/iptables"
"k8s.io/kubernetes/pkg/proxy/ipvs"
@ -135,8 +134,6 @@ func newProxyServer(
}
var proxier proxy.ProxyProvider
var serviceEventHandler proxyconfig.ServiceHandler
var endpointsEventHandler proxyconfig.EndpointsHandler
proxyMode := getProxyMode(string(config.Mode), iptInterface, kernelHandler, ipsetInterface, iptables.LinuxKernelCompatTester{})
nodeIP := net.ParseIP(config.BindAddress)
@ -151,7 +148,7 @@ func newProxyServer(
}
// TODO this has side effects that should only happen when Run() is invoked.
proxierIPTables, err := iptables.NewProxier(
proxier, err = iptables.NewProxier(
iptInterface,
utilsysctl.New(),
execer,
@ -170,12 +167,9 @@ func newProxyServer(
return nil, fmt.Errorf("unable to create proxier: %v", err)
}
metrics.RegisterMetrics()
proxier = proxierIPTables
serviceEventHandler = proxierIPTables
endpointsEventHandler = proxierIPTables
} else if proxyMode == proxyModeIPVS {
klog.V(0).Info("Using ipvs Proxier.")
proxierIPVS, err := ipvs.NewProxier(
proxier, err = ipvs.NewProxier(
iptInterface,
ipvsInterface,
ipsetInterface,
@ -199,20 +193,12 @@ func newProxyServer(
return nil, fmt.Errorf("unable to create proxier: %v", err)
}
metrics.RegisterMetrics()
proxier = proxierIPVS
serviceEventHandler = proxierIPVS
endpointsEventHandler = proxierIPVS
} else {
klog.V(0).Info("Using userspace Proxier.")
// This is a proxy.LoadBalancer which NewProxier needs but has methods we don't need for
// our config.EndpointsConfigHandler.
loadBalancer := userspace.NewLoadBalancerRR()
// set EndpointsConfigHandler to our loadBalancer
endpointsEventHandler = loadBalancer
// TODO this has side effects that should only happen when Run() is invoked.
proxierUserspace, err := userspace.NewProxier(
loadBalancer,
proxier, err = userspace.NewProxier(
userspace.NewLoadBalancerRR(),
net.ParseIP(config.BindAddress),
iptInterface,
execer,
@ -225,8 +211,6 @@ func newProxyServer(
if err != nil {
return nil, fmt.Errorf("unable to create proxier: %v", err)
}
serviceEventHandler = proxierUserspace
proxier = proxierUserspace
}
iptInterface.AddReloadFunc(proxier.Sync)
@ -250,8 +234,6 @@ func newProxyServer(
OOMScoreAdj: config.OOMScoreAdj,
ResourceContainer: config.ResourceContainer,
ConfigSyncPeriod: config.ConfigSyncPeriod.Duration,
ServiceEventHandler: serviceEventHandler,
EndpointsEventHandler: endpointsEventHandler,
HealthzServer: healthzServer,
}, nil
}

View File

@ -33,7 +33,6 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/proxy"
proxyconfigapi "k8s.io/kubernetes/pkg/proxy/apis/config"
proxyconfig "k8s.io/kubernetes/pkg/proxy/config"
"k8s.io/kubernetes/pkg/proxy/healthcheck"
"k8s.io/kubernetes/pkg/proxy/winkernel"
"k8s.io/kubernetes/pkg/proxy/winuserspace"
@ -94,13 +93,11 @@ func newProxyServer(config *proxyconfigapi.KubeProxyConfiguration, cleanupAndExi
}
var proxier proxy.ProxyProvider
var serviceEventHandler proxyconfig.ServiceHandler
var endpointsEventHandler proxyconfig.EndpointsHandler
proxyMode := getProxyMode(string(config.Mode), winkernel.WindowsKernelCompatTester{})
if proxyMode == proxyModeKernelspace {
klog.V(0).Info("Using Kernelspace Proxier.")
proxierKernelspace, err := winkernel.NewProxier(
proxier, err = winkernel.NewProxier(
config.IPTables.SyncPeriod.Duration,
config.IPTables.MinSyncPeriod.Duration,
config.IPTables.MasqueradeAll,
@ -115,23 +112,14 @@ func newProxyServer(config *proxyconfigapi.KubeProxyConfiguration, cleanupAndExi
if err != nil {
return nil, fmt.Errorf("unable to create proxier: %v", err)
}
proxier = proxierKernelspace
endpointsEventHandler = proxierKernelspace
serviceEventHandler = proxierKernelspace
} else {
klog.V(0).Info("Using userspace Proxier.")
execer := exec.New()
var netshInterface utilnetsh.Interface
netshInterface = utilnetsh.New(execer)
// This is a proxy.LoadBalancer which NewProxier needs but has methods we don't need for
// our config.EndpointsConfigHandler.
loadBalancer := winuserspace.NewLoadBalancerRR()
// set EndpointsConfigHandler to our loadBalancer
endpointsEventHandler = loadBalancer
proxierUserspace, err := winuserspace.NewProxier(
loadBalancer,
proxier, err = winuserspace.NewProxier(
winuserspace.NewLoadBalancerRR(),
net.ParseIP(config.BindAddress),
netshInterface,
*utilnet.ParsePortRangeOrDie(config.PortRange),
@ -142,26 +130,22 @@ func newProxyServer(config *proxyconfigapi.KubeProxyConfiguration, cleanupAndExi
if err != nil {
return nil, fmt.Errorf("unable to create proxier: %v", err)
}
proxier = proxierUserspace
serviceEventHandler = proxierUserspace
}
return &ProxyServer{
Client: client,
EventClient: eventClient,
Proxier: proxier,
Broadcaster: eventBroadcaster,
Recorder: recorder,
ProxyMode: proxyMode,
NodeRef: nodeRef,
MetricsBindAddress: config.MetricsBindAddress,
EnableProfiling: config.EnableProfiling,
OOMScoreAdj: config.OOMScoreAdj,
ResourceContainer: config.ResourceContainer,
ConfigSyncPeriod: config.ConfigSyncPeriod.Duration,
ServiceEventHandler: serviceEventHandler,
EndpointsEventHandler: endpointsEventHandler,
HealthzServer: healthzServer,
Client: client,
EventClient: eventClient,
Proxier: proxier,
Broadcaster: eventBroadcaster,
Recorder: recorder,
ProxyMode: proxyMode,
NodeRef: nodeRef,
MetricsBindAddress: config.MetricsBindAddress,
EnableProfiling: config.EnableProfiling,
OOMScoreAdj: config.OOMScoreAdj,
ResourceContainer: config.ResourceContainer,
ConfigSyncPeriod: config.ConfigSyncPeriod.Duration,
HealthzServer: healthzServer,
}, nil
}

View File

@ -26,7 +26,6 @@ go_library(
"//pkg/kubelet/dockershim:go_default_library",
"//pkg/kubelet/types:go_default_library",
"//pkg/proxy:go_default_library",
"//pkg/proxy/config:go_default_library",
"//pkg/proxy/iptables:go_default_library",
"//pkg/util/iptables:go_default_library",
"//pkg/util/mount:go_default_library",

View File

@ -27,7 +27,6 @@ import (
"k8s.io/client-go/tools/record"
proxyapp "k8s.io/kubernetes/cmd/kube-proxy/app"
"k8s.io/kubernetes/pkg/proxy"
proxyconfig "k8s.io/kubernetes/pkg/proxy/config"
"k8s.io/kubernetes/pkg/proxy/iptables"
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
utilnode "k8s.io/kubernetes/pkg/util/node"
@ -72,13 +71,12 @@ func NewHollowProxyOrDie(
) (*HollowProxy, error) {
// Create proxier and service/endpoint handlers.
var proxier proxy.ProxyProvider
var serviceHandler proxyconfig.ServiceHandler
var endpointsHandler proxyconfig.EndpointsHandler
var err error
if useRealProxier {
// Real proxier with fake iptables, sysctl, etc underneath it.
//var err error
proxierIPTables, err := iptables.NewProxier(
proxier, err = iptables.NewProxier(
iptInterface,
sysctl,
execer,
@ -96,13 +94,8 @@ func NewHollowProxyOrDie(
if err != nil {
return nil, fmt.Errorf("unable to create proxier: %v", err)
}
proxier = proxierIPTables
serviceHandler = proxierIPTables
endpointsHandler = proxierIPTables
} else {
proxier = &FakeProxier{}
serviceHandler = &FakeProxier{}
endpointsHandler = &FakeProxier{}
}
// Create a Hollow Proxy instance.
@ -114,19 +107,17 @@ func NewHollowProxyOrDie(
}
return &HollowProxy{
ProxyServer: &proxyapp.ProxyServer{
Client: client,
EventClient: eventClient,
IptInterface: iptInterface,
Proxier: proxier,
Broadcaster: broadcaster,
Recorder: recorder,
ProxyMode: "fake",
NodeRef: nodeRef,
OOMScoreAdj: utilpointer.Int32Ptr(0),
ResourceContainer: "",
ConfigSyncPeriod: 30 * time.Second,
ServiceEventHandler: serviceHandler,
EndpointsEventHandler: endpointsHandler,
Client: client,
EventClient: eventClient,
IptInterface: iptInterface,
Proxier: proxier,
Broadcaster: broadcaster,
Recorder: recorder,
ProxyMode: "fake",
NodeRef: nodeRef,
OOMScoreAdj: utilpointer.Int32Ptr(0),
ResourceContainer: "",
ConfigSyncPeriod: 30 * time.Second,
},
}, nil
}

View File

@ -17,6 +17,7 @@ go_library(
importpath = "k8s.io/kubernetes/pkg/proxy",
deps = [
"//pkg/api/v1/service:go_default_library",
"//pkg/proxy/config:go_default_library",
"//pkg/proxy/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",

View File

@ -21,10 +21,14 @@ import (
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/proxy/config"
)
// ProxyProvider is the interface provided by proxier implementations.
type ProxyProvider interface {
config.EndpointsHandler
config.ServiceHandler
// Sync immediately synchronizes the ProxyProvider's current state to proxy rules.
Sync()
// SyncLoop runs periodic work.

View File

@ -21,6 +21,7 @@ go_library(
deps = [
"//pkg/apis/core/v1/helper:go_default_library",
"//pkg/proxy:go_default_library",
"//pkg/proxy/config:go_default_library",
"//pkg/proxy/util:go_default_library",
"//pkg/util/conntrack:go_default_library",
"//pkg/util/iptables:go_default_library",

View File

@ -19,6 +19,7 @@ package userspace
import (
"k8s.io/api/core/v1"
"k8s.io/kubernetes/pkg/proxy"
proxyconfig "k8s.io/kubernetes/pkg/proxy/config"
"net"
)
@ -31,4 +32,6 @@ type LoadBalancer interface {
DeleteService(service proxy.ServicePortName)
CleanupStaleStickySessions(service proxy.ServicePortName)
ServiceHasEndpoints(service proxy.ServicePortName) bool
proxyconfig.EndpointsHandler
}

View File

@ -526,6 +526,22 @@ func (proxier *Proxier) OnServiceDelete(service *v1.Service) {
func (proxier *Proxier) OnServiceSynced() {
}
func (proxier *Proxier) OnEndpointsAdd(endpoints *v1.Endpoints) {
proxier.loadBalancer.OnEndpointsAdd(endpoints)
}
func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) {
proxier.loadBalancer.OnEndpointsUpdate(oldEndpoints, endpoints)
}
func (proxier *Proxier) OnEndpointsDelete(endpoints *v1.Endpoints) {
proxier.loadBalancer.OnEndpointsDelete(endpoints)
}
func (proxier *Proxier) OnEndpointsSynced() {
proxier.loadBalancer.OnEndpointsSynced()
}
func sameConfig(info *ServiceInfo, service *v1.Service, port *v1.ServicePort) bool {
if info.protocol != port.Protocol || info.portal.port != int(port.Port) || info.nodePort != int(port.NodePort) {
return false

View File

@ -19,6 +19,7 @@ go_library(
deps = [
"//pkg/apis/core/v1/helper:go_default_library",
"//pkg/proxy:go_default_library",
"//pkg/proxy/config:go_default_library",
"//pkg/util/ipconfig:go_default_library",
"//pkg/util/netsh:go_default_library",
"//pkg/util/slice:go_default_library",

View File

@ -19,6 +19,7 @@ package winuserspace
import (
"k8s.io/api/core/v1"
"k8s.io/kubernetes/pkg/proxy"
proxyconfig "k8s.io/kubernetes/pkg/proxy/config"
"net"
)
@ -30,4 +31,6 @@ type LoadBalancer interface {
NewService(service proxy.ServicePortName, sessionAffinityType v1.ServiceAffinity, stickyMaxAgeMinutes int) error
DeleteService(service proxy.ServicePortName)
CleanupStaleStickySessions(service proxy.ServicePortName)
proxyconfig.EndpointsHandler
}

View File

@ -444,6 +444,22 @@ func (proxier *Proxier) OnServiceDelete(service *v1.Service) {
func (proxier *Proxier) OnServiceSynced() {
}
func (proxier *Proxier) OnEndpointsAdd(endpoints *v1.Endpoints) {
proxier.loadBalancer.OnEndpointsAdd(endpoints)
}
func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) {
proxier.loadBalancer.OnEndpointsUpdate(oldEndpoints, endpoints)
}
func (proxier *Proxier) OnEndpointsDelete(endpoints *v1.Endpoints) {
proxier.loadBalancer.OnEndpointsDelete(endpoints)
}
func (proxier *Proxier) OnEndpointsSynced() {
proxier.loadBalancer.OnEndpointsSynced()
}
func sameConfig(info *serviceInfo, service *v1.Service, protocol v1.Protocol, listenPort int) bool {
return info.protocol == protocol && info.portal.port == listenPort && info.sessionAffinityType == service.Spec.SessionAffinity
}