mirror of https://github.com/k3s-io/k3s
Merge pull request #45404 from wojtek-t/edge_based_winuserspace_proxy
Automatic merge from submit-queue Edge based winuserspace proxy Last PR in the series of making kube-proxy event-based. This is a sibling PR to https://github.com/kubernetes/kubernetes/pull/45356 that is already merged. The second commit is removing the code that is no longer used.pull/6/head
commit
bfa18037ce
|
@ -302,11 +302,8 @@ type ProxyServer struct {
|
|||
ResourceContainer string
|
||||
ConfigSyncPeriod time.Duration
|
||||
ServiceEventHandler proxyconfig.ServiceHandler
|
||||
// TODO: Migrate all handlers to ServiceHandler types and
|
||||
// get rid of this one.
|
||||
ServiceHandler proxyconfig.ServiceConfigHandler
|
||||
EndpointsEventHandler proxyconfig.EndpointsHandler
|
||||
HealthzServer *healthcheck.HealthzServer
|
||||
EndpointsEventHandler proxyconfig.EndpointsHandler
|
||||
HealthzServer *healthcheck.HealthzServer
|
||||
}
|
||||
|
||||
// createClients creates a kube client and an event client from the given config and masterOverride.
|
||||
|
@ -397,9 +394,6 @@ func NewProxyServer(config *componentconfig.KubeProxyConfiguration, cleanupAndEx
|
|||
|
||||
var proxier proxy.ProxyProvider
|
||||
var serviceEventHandler proxyconfig.ServiceHandler
|
||||
// TODO: Migrate all handlers to ServiceHandler types and
|
||||
// get rid of this one.
|
||||
var serviceHandler proxyconfig.ServiceConfigHandler
|
||||
var endpointsEventHandler proxyconfig.EndpointsHandler
|
||||
|
||||
proxyMode := getProxyMode(string(config.Mode), iptInterface, iptables.LinuxKernelCompatTester{})
|
||||
|
@ -455,7 +449,7 @@ func NewProxyServer(config *componentconfig.KubeProxyConfiguration, cleanupAndEx
|
|||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to create proxier: %v", err)
|
||||
}
|
||||
serviceHandler = proxierUserspace
|
||||
serviceEventHandler = proxierUserspace
|
||||
proxier = proxierUserspace
|
||||
} else {
|
||||
// This is a proxy.LoadBalancer which NewProxier needs but has methods we don't need for
|
||||
|
@ -517,7 +511,6 @@ func NewProxyServer(config *componentconfig.KubeProxyConfiguration, cleanupAndEx
|
|||
ResourceContainer: config.ResourceContainer,
|
||||
ConfigSyncPeriod: config.ConfigSyncPeriod.Duration,
|
||||
ServiceEventHandler: serviceEventHandler,
|
||||
ServiceHandler: serviceHandler,
|
||||
EndpointsEventHandler: endpointsEventHandler,
|
||||
HealthzServer: healthzServer,
|
||||
}, nil
|
||||
|
@ -621,12 +614,7 @@ func (s *ProxyServer) Run() error {
|
|||
// only notify on changes, and the initial update (on process start) may be lost if no handlers
|
||||
// are registered yet.
|
||||
serviceConfig := proxyconfig.NewServiceConfig(informerFactory.Core().InternalVersion().Services(), s.ConfigSyncPeriod)
|
||||
if s.ServiceHandler != nil {
|
||||
serviceConfig.RegisterHandler(s.ServiceHandler)
|
||||
}
|
||||
if s.ServiceEventHandler != nil {
|
||||
serviceConfig.RegisterEventHandler(s.ServiceEventHandler)
|
||||
}
|
||||
serviceConfig.RegisterEventHandler(s.ServiceEventHandler)
|
||||
go serviceConfig.Run(wait.NeverStop)
|
||||
|
||||
endpointsConfig := proxyconfig.NewEndpointsConfig(informerFactory.Core().InternalVersion().Endpoints(), s.ConfigSyncPeriod)
|
||||
|
|
|
@ -21,7 +21,6 @@ go_library(
|
|||
"//pkg/client/listers/core/internalversion:go_default_library",
|
||||
"//pkg/controller:go_default_library",
|
||||
"//vendor/github.com/golang/glog:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
|
||||
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
|
||||
],
|
||||
|
|
|
@ -21,7 +21,6 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
|
@ -30,22 +29,6 @@ import (
|
|||
"k8s.io/kubernetes/pkg/controller"
|
||||
)
|
||||
|
||||
// ServiceConfigHandler is an abstract interface of objects which receive update notifications for the set of services.
|
||||
// DEPRECATED: Use ServiceHandler instead - this will be removed soon.
|
||||
type ServiceConfigHandler interface {
|
||||
// OnServiceUpdate gets called when a service is created, removed or changed
|
||||
// on any of the configuration sources. An example is when a new service
|
||||
// comes up.
|
||||
//
|
||||
// NOTE: For efficiency, services are being passed by reference, thus,
|
||||
// OnServiceUpdate should NOT modify pointers of a given slice.
|
||||
// Those service objects are shared with other layers of the system and
|
||||
// are guaranteed to be immutable with the assumption that are also
|
||||
// not mutated by those handlers. Make a deep copy if you need to modify
|
||||
// them in your code.
|
||||
OnServiceUpdate(services []*api.Service)
|
||||
}
|
||||
|
||||
// ServiceHandler is an abstract interface of objects which receive
|
||||
// notifications about service object changes.
|
||||
type ServiceHandler interface {
|
||||
|
@ -185,11 +168,6 @@ type ServiceConfig struct {
|
|||
lister listers.ServiceLister
|
||||
listerSynced cache.InformerSynced
|
||||
eventHandlers []ServiceHandler
|
||||
// TODO: Remove as soon as we migrate everything to event handlers.
|
||||
handlers []ServiceConfigHandler
|
||||
// updates channel is used to trigger registered handlers
|
||||
updates chan struct{}
|
||||
stop chan struct{}
|
||||
}
|
||||
|
||||
// NewServiceConfig creates a new ServiceConfig.
|
||||
|
@ -197,12 +175,6 @@ func NewServiceConfig(serviceInformer coreinformers.ServiceInformer, resyncPerio
|
|||
result := &ServiceConfig{
|
||||
lister: serviceInformer.Lister(),
|
||||
listerSynced: serviceInformer.Informer().HasSynced,
|
||||
// The updates channel is used to send interrupts to the Services handler.
|
||||
// It's buffered because we never want to block for as long as there is a
|
||||
// pending interrupt, but don't want to drop them if the handler is doing
|
||||
// work.
|
||||
updates: make(chan struct{}, 1),
|
||||
stop: make(chan struct{}),
|
||||
}
|
||||
|
||||
serviceInformer.Informer().AddEventHandlerWithResyncPeriod(
|
||||
|
@ -217,12 +189,6 @@ func NewServiceConfig(serviceInformer coreinformers.ServiceInformer, resyncPerio
|
|||
return result
|
||||
}
|
||||
|
||||
// RegisterHandler registers a handler which is called on every services change.
|
||||
// DEPRECATED: Use RegisterEventHandler instead - this will be removed soon.
|
||||
func (c *ServiceConfig) RegisterHandler(handler ServiceConfigHandler) {
|
||||
c.handlers = append(c.handlers, handler)
|
||||
}
|
||||
|
||||
// RegisterEventHandler registers a handler which is called on every service change.
|
||||
func (c *ServiceConfig) RegisterEventHandler(handler ServiceHandler) {
|
||||
c.eventHandlers = append(c.eventHandlers, handler)
|
||||
|
@ -240,40 +206,12 @@ func (c *ServiceConfig) Run(stopCh <-chan struct{}) {
|
|||
return
|
||||
}
|
||||
|
||||
// We have synced informers. Now we can start delivering updates
|
||||
// to the registered handler.
|
||||
for i := range c.eventHandlers {
|
||||
glog.V(3).Infof("Calling handler.OnServiceSynced()")
|
||||
c.eventHandlers[i].OnServiceSynced()
|
||||
}
|
||||
go func() {
|
||||
defer utilruntime.HandleCrash()
|
||||
for {
|
||||
select {
|
||||
case <-c.updates:
|
||||
services, err := c.lister.List(labels.Everything())
|
||||
if err != nil {
|
||||
glog.Errorf("Error while listing services from cache: %v", err)
|
||||
// This will cause a retry (if there isnt' any other trigger in-flight).
|
||||
c.dispatchUpdate()
|
||||
continue
|
||||
}
|
||||
if services == nil {
|
||||
services = []*api.Service{}
|
||||
}
|
||||
for i := range c.handlers {
|
||||
glog.V(3).Infof("Calling handler.OnServiceUpdate()")
|
||||
c.handlers[i].OnServiceUpdate(services)
|
||||
}
|
||||
case <-c.stop:
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Close updates channel when stopCh is closed.
|
||||
<-stopCh
|
||||
close(c.stop)
|
||||
}
|
||||
|
||||
func (c *ServiceConfig) handleAddService(obj interface{}) {
|
||||
|
@ -286,7 +224,6 @@ func (c *ServiceConfig) handleAddService(obj interface{}) {
|
|||
glog.V(4).Infof("Calling handler.OnServiceAdd")
|
||||
c.eventHandlers[i].OnServiceAdd(service)
|
||||
}
|
||||
c.dispatchUpdate()
|
||||
}
|
||||
|
||||
func (c *ServiceConfig) handleUpdateService(oldObj, newObj interface{}) {
|
||||
|
@ -304,7 +241,6 @@ func (c *ServiceConfig) handleUpdateService(oldObj, newObj interface{}) {
|
|||
glog.V(4).Infof("Calling handler.OnServiceUpdate")
|
||||
c.eventHandlers[i].OnServiceUpdate(oldService, service)
|
||||
}
|
||||
c.dispatchUpdate()
|
||||
}
|
||||
|
||||
func (c *ServiceConfig) handleDeleteService(obj interface{}) {
|
||||
|
@ -324,16 +260,4 @@ func (c *ServiceConfig) handleDeleteService(obj interface{}) {
|
|||
glog.V(4).Infof("Calling handler.OnServiceDelete")
|
||||
c.eventHandlers[i].OnServiceDelete(service)
|
||||
}
|
||||
c.dispatchUpdate()
|
||||
}
|
||||
|
||||
func (c *ServiceConfig) dispatchUpdate() {
|
||||
select {
|
||||
case c.updates <- struct{}{}:
|
||||
// Work enqueued successfully
|
||||
case <-c.stop:
|
||||
// We're shut down / avoid logging the message below
|
||||
default:
|
||||
glog.V(4).Infof("Service handler already has a pending interrupt.")
|
||||
}
|
||||
}
|
||||
|
|
|
@ -315,86 +315,78 @@ func getListenIPPortMap(service *api.Service, listenPort int, nodePort int) map[
|
|||
return listenIPPortMap
|
||||
}
|
||||
|
||||
// OnServiceUpdate manages the active set of service proxies.
|
||||
// Active service proxies are reinitialized if found in the update set or
|
||||
// shutdown if missing from the update set.
|
||||
func (proxier *Proxier) OnServiceUpdate(services []*api.Service) {
|
||||
glog.V(4).Infof("Received update notice: %+v", services)
|
||||
activeServicePortPortals := make(map[ServicePortPortalName]bool) // use a map as a set
|
||||
for _, service := range services {
|
||||
// if ClusterIP is "None" or empty, skip proxying
|
||||
if !helper.IsServiceIPSet(service) {
|
||||
glog.V(3).Infof("Skipping service %s due to clusterIP = %q", types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, service.Spec.ClusterIP)
|
||||
continue
|
||||
func (proxier *Proxier) mergeService(service *api.Service) map[ServicePortPortalName]bool {
|
||||
if service == nil {
|
||||
return nil
|
||||
}
|
||||
svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
|
||||
if !helper.IsServiceIPSet(service) {
|
||||
glog.V(3).Infof("Skipping service %s due to clusterIP = %q", svcName, service.Spec.ClusterIP)
|
||||
return nil
|
||||
}
|
||||
existingPortPortals := make(map[ServicePortPortalName]bool)
|
||||
|
||||
for i := range service.Spec.Ports {
|
||||
servicePort := &service.Spec.Ports[i]
|
||||
// create a slice of all the source IPs to use for service port portals
|
||||
listenIPPortMap := getListenIPPortMap(service, int(servicePort.Port), int(servicePort.NodePort))
|
||||
protocol := servicePort.Protocol
|
||||
|
||||
for listenIP, listenPort := range listenIPPortMap {
|
||||
servicePortPortalName := ServicePortPortalName{
|
||||
NamespacedName: svcName,
|
||||
Port: servicePort.Name,
|
||||
PortalIPName: listenIP,
|
||||
}
|
||||
existingPortPortals[servicePortPortalName] = true
|
||||
info, exists := proxier.getServiceInfo(servicePortPortalName)
|
||||
if exists && sameConfig(info, service, protocol, listenPort) {
|
||||
// Nothing changed.
|
||||
continue
|
||||
}
|
||||
if exists {
|
||||
glog.V(4).Infof("Something changed for service %q: stopping it", servicePortPortalName)
|
||||
if err := proxier.closeServicePortPortal(servicePortPortalName, info); err != nil {
|
||||
glog.Errorf("Failed to close service port portal %q: %v", servicePortPortalName, err)
|
||||
}
|
||||
}
|
||||
glog.V(1).Infof("Adding new service %q at %s:%d/%s", servicePortPortalName, listenIP, listenPort, protocol)
|
||||
info, err := proxier.addServicePortPortal(servicePortPortalName, protocol, listenIP, listenPort, proxier.udpIdleTimeout)
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to start proxy for %q: %v", servicePortPortalName, err)
|
||||
continue
|
||||
}
|
||||
info.sessionAffinityType = service.Spec.SessionAffinity
|
||||
glog.V(10).Infof("info: %#v", info)
|
||||
}
|
||||
|
||||
for i := range service.Spec.Ports {
|
||||
servicePort := &service.Spec.Ports[i]
|
||||
// create a slice of all the source IPs to use for service port portals
|
||||
listenIPPortMap := getListenIPPortMap(service, int(servicePort.Port), int(servicePort.NodePort))
|
||||
protocol := servicePort.Protocol
|
||||
|
||||
for listenIP, listenPort := range listenIPPortMap {
|
||||
servicePortPortalName := ServicePortPortalName{
|
||||
NamespacedName: types.NamespacedName{
|
||||
Namespace: service.Namespace,
|
||||
Name: service.Name,
|
||||
},
|
||||
Port: servicePort.Name,
|
||||
PortalIPName: listenIP,
|
||||
}
|
||||
activeServicePortPortals[servicePortPortalName] = true
|
||||
info, exists := proxier.getServiceInfo(servicePortPortalName)
|
||||
if exists && sameConfig(info, service, protocol, listenPort) {
|
||||
// Nothing changed.
|
||||
continue
|
||||
}
|
||||
if exists {
|
||||
glog.V(4).Infof("Something changed for service %q: stopping it", servicePortPortalName)
|
||||
if err := proxier.closeServicePortPortal(servicePortPortalName, info); err != nil {
|
||||
glog.Errorf("Failed to close service port portal %q: %v", servicePortPortalName, err)
|
||||
}
|
||||
}
|
||||
glog.V(1).Infof("Adding new service %q at %s:%d/%s", servicePortPortalName, listenIP, listenPort, protocol)
|
||||
info, err := proxier.addServicePortPortal(servicePortPortalName, protocol, listenIP, listenPort, proxier.udpIdleTimeout)
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to start proxy for %q: %v", servicePortPortalName, err)
|
||||
continue
|
||||
}
|
||||
info.sessionAffinityType = service.Spec.SessionAffinity
|
||||
glog.V(10).Infof("info: %#v", info)
|
||||
}
|
||||
if len(listenIPPortMap) > 0 {
|
||||
// only one loadbalancer per service port portal
|
||||
servicePortName := proxy.ServicePortName{
|
||||
NamespacedName: types.NamespacedName{
|
||||
Namespace: service.Namespace,
|
||||
Name: service.Name,
|
||||
},
|
||||
Port: servicePort.Name,
|
||||
}
|
||||
proxier.loadBalancer.NewService(servicePortName, service.Spec.SessionAffinity, stickyMaxAgeMinutes)
|
||||
if len(listenIPPortMap) > 0 {
|
||||
// only one loadbalancer per service port portal
|
||||
servicePortName := proxy.ServicePortName{
|
||||
NamespacedName: types.NamespacedName{
|
||||
Namespace: service.Namespace,
|
||||
Name: service.Name,
|
||||
},
|
||||
Port: servicePort.Name,
|
||||
}
|
||||
proxier.loadBalancer.NewService(servicePortName, service.Spec.SessionAffinity, stickyMaxAgeMinutes)
|
||||
}
|
||||
}
|
||||
|
||||
for name, info := range proxier.serviceMap {
|
||||
if !activeServicePortPortals[name] {
|
||||
glog.V(1).Infof("Stopping service %q", name)
|
||||
return existingPortPortals
|
||||
}
|
||||
|
||||
if err := proxier.closeServicePortPortal(name, info); err != nil {
|
||||
glog.Errorf("Failed to close service port portal %q: %v", name, err)
|
||||
}
|
||||
}
|
||||
func (proxier *Proxier) unmergeService(service *api.Service, existingPortPortals map[ServicePortPortalName]bool) {
|
||||
if service == nil {
|
||||
return
|
||||
}
|
||||
svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
|
||||
if !helper.IsServiceIPSet(service) {
|
||||
glog.V(3).Infof("Skipping service %s due to clusterIP = %q", svcName, service.Spec.ClusterIP)
|
||||
return
|
||||
}
|
||||
|
||||
proxier.mu.Lock()
|
||||
defer proxier.mu.Unlock()
|
||||
|
||||
// servicePortNameMap tracks all service port portals with the same name/port.
|
||||
// A value of true means there is one or more service port portals with name/port pair.
|
||||
servicePortNameMap := make(map[proxy.ServicePortName]bool)
|
||||
for name := range proxier.serviceMap {
|
||||
for name := range existingPortPortals {
|
||||
servicePortName := proxy.ServicePortName{
|
||||
NamespacedName: types.NamespacedName{
|
||||
Namespace: name.Namespace,
|
||||
|
@ -402,17 +394,60 @@ func (proxier *Proxier) OnServiceUpdate(services []*api.Service) {
|
|||
},
|
||||
Port: name.Port,
|
||||
}
|
||||
servicePortNameMap[servicePortName] = servicePortNameMap[servicePortName] || activeServicePortPortals[name]
|
||||
servicePortNameMap[servicePortName] = true
|
||||
}
|
||||
|
||||
// Only delete load balancer if all listen ips per name/port show inactive.
|
||||
for name := range servicePortNameMap {
|
||||
if !servicePortNameMap[name] {
|
||||
proxier.loadBalancer.DeleteService(name)
|
||||
for i := range service.Spec.Ports {
|
||||
servicePort := &service.Spec.Ports[i]
|
||||
serviceName := proxy.ServicePortName{NamespacedName: svcName, Port: servicePort.Name}
|
||||
// create a slice of all the source IPs to use for service port portals
|
||||
listenIPPortMap := getListenIPPortMap(service, int(servicePort.Port), int(servicePort.NodePort))
|
||||
|
||||
for listenIP := range listenIPPortMap {
|
||||
servicePortPortalName := ServicePortPortalName{
|
||||
NamespacedName: svcName,
|
||||
Port: servicePort.Name,
|
||||
PortalIPName: listenIP,
|
||||
}
|
||||
if existingPortPortals[servicePortPortalName] {
|
||||
continue
|
||||
}
|
||||
|
||||
glog.V(1).Infof("Stopping service %q", servicePortPortalName)
|
||||
info, exists := proxier.getServiceInfo(servicePortPortalName)
|
||||
if !exists {
|
||||
glog.Errorf("Service %q is being removed but doesn't exist", servicePortPortalName)
|
||||
continue
|
||||
}
|
||||
|
||||
if err := proxier.closeServicePortPortal(servicePortPortalName, info); err != nil {
|
||||
glog.Errorf("Failed to close service port portal %q: %v", servicePortPortalName, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Only delete load balancer if all listen ips per name/port show inactive.
|
||||
if !servicePortNameMap[serviceName] {
|
||||
proxier.loadBalancer.DeleteService(serviceName)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (proxier *Proxier) OnServiceAdd(service *api.Service) {
|
||||
_ = proxier.mergeService(service)
|
||||
}
|
||||
|
||||
func (proxier *Proxier) OnServiceUpdate(oldService, service *api.Service) {
|
||||
existingPortPortals := proxier.mergeService(service)
|
||||
proxier.unmergeService(oldService, existingPortPortals)
|
||||
}
|
||||
|
||||
func (proxier *Proxier) OnServiceDelete(service *api.Service) {
|
||||
proxier.unmergeService(service, map[ServicePortPortalName]bool{})
|
||||
}
|
||||
|
||||
func (proxier *Proxier) OnServiceSynced() {
|
||||
}
|
||||
|
||||
func sameConfig(info *serviceInfo, service *api.Service, protocol api.Protocol, listenPort int) bool {
|
||||
return info.protocol == protocol && info.portal.port == listenPort && info.sessionAffinityType == service.Spec.SessionAffinity
|
||||
}
|
||||
|
|
|
@ -341,7 +341,7 @@ func TestMultiPortProxy(t *testing.T) {
|
|||
waitForNumProxyLoops(t, p, 2)
|
||||
}
|
||||
|
||||
func TestMultiPortOnServiceUpdate(t *testing.T) {
|
||||
func TestMultiPortOnServiceAdd(t *testing.T) {
|
||||
lb := NewLoadBalancerRR()
|
||||
serviceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
|
||||
serviceQ := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "q"}
|
||||
|
@ -354,7 +354,7 @@ func TestMultiPortOnServiceUpdate(t *testing.T) {
|
|||
}
|
||||
waitForNumProxyLoops(t, p, 0)
|
||||
|
||||
p.OnServiceUpdate([]*api.Service{{
|
||||
p.OnServiceAdd(&api.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace},
|
||||
Spec: api.ServiceSpec{ClusterIP: "0.0.0.0", Ports: []api.ServicePort{{
|
||||
Name: "p",
|
||||
|
@ -365,7 +365,7 @@ func TestMultiPortOnServiceUpdate(t *testing.T) {
|
|||
Port: 0,
|
||||
Protocol: "UDP",
|
||||
}}},
|
||||
}})
|
||||
})
|
||||
waitForNumProxyLoops(t, p, 2)
|
||||
|
||||
servicePortPortalNameP := ServicePortPortalName{NamespacedName: types.NamespacedName{Namespace: serviceP.Namespace, Name: serviceP.Name}, Port: serviceP.Port, PortalIPName: listenIP}
|
||||
|
@ -515,7 +515,14 @@ func TestTCPProxyUpdateDelete(t *testing.T) {
|
|||
conn.Close()
|
||||
waitForNumProxyLoops(t, p, 1)
|
||||
|
||||
p.OnServiceUpdate([]*api.Service{})
|
||||
p.OnServiceDelete(&api.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
|
||||
Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{
|
||||
Name: "p",
|
||||
Port: int32(getPortNum(t, svcInfo.socket.Addr().String())),
|
||||
Protocol: "TCP",
|
||||
}}},
|
||||
})
|
||||
if err := waitForClosedPortTCP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil {
|
||||
t.Fatalf(err.Error())
|
||||
}
|
||||
|
@ -552,7 +559,14 @@ func TestUDPProxyUpdateDelete(t *testing.T) {
|
|||
conn.Close()
|
||||
waitForNumProxyLoops(t, p, 1)
|
||||
|
||||
p.OnServiceUpdate([]*api.Service{})
|
||||
p.OnServiceDelete(&api.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
|
||||
Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{
|
||||
Name: "p",
|
||||
Port: int32(getPortNum(t, svcInfo.socket.Addr().String())),
|
||||
Protocol: "UDP",
|
||||
}}},
|
||||
})
|
||||
if err := waitForClosedPortUDP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil {
|
||||
t.Fatalf(err.Error())
|
||||
}
|
||||
|
@ -590,7 +604,14 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
|
|||
conn.Close()
|
||||
waitForNumProxyLoops(t, p, 1)
|
||||
|
||||
p.OnServiceUpdate([]*api.Service{})
|
||||
p.OnServiceDelete(&api.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
|
||||
Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{
|
||||
Name: "p",
|
||||
Port: int32(getPortNum(t, svcInfo.socket.Addr().String())),
|
||||
Protocol: "TCP",
|
||||
}}},
|
||||
})
|
||||
if err := waitForClosedPortTCP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil {
|
||||
t.Fatalf(err.Error())
|
||||
}
|
||||
|
@ -598,14 +619,14 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
|
|||
|
||||
// need to add endpoint here because it got clean up during service delete
|
||||
lb.OnEndpointsAdd(endpoint)
|
||||
p.OnServiceUpdate([]*api.Service{{
|
||||
p.OnServiceAdd(&api.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
|
||||
Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{
|
||||
Name: "p",
|
||||
Port: int32(getPortNum(t, svcInfo.socket.Addr().String())),
|
||||
Protocol: "TCP",
|
||||
}}},
|
||||
}})
|
||||
})
|
||||
svcInfo, exists := p.getServiceInfo(servicePortPortalName)
|
||||
if !exists {
|
||||
t.Fatalf("can't find serviceInfo for %s", servicePortPortalName)
|
||||
|
@ -645,7 +666,14 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
|
|||
conn.Close()
|
||||
waitForNumProxyLoops(t, p, 1)
|
||||
|
||||
p.OnServiceUpdate([]*api.Service{})
|
||||
p.OnServiceDelete(&api.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
|
||||
Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{
|
||||
Name: "p",
|
||||
Port: int32(getPortNum(t, svcInfo.socket.Addr().String())),
|
||||
Protocol: "UDP",
|
||||
}}},
|
||||
})
|
||||
if err := waitForClosedPortUDP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil {
|
||||
t.Fatalf(err.Error())
|
||||
}
|
||||
|
@ -653,14 +681,14 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
|
|||
|
||||
// need to add endpoint here because it got clean up during service delete
|
||||
lb.OnEndpointsAdd(endpoint)
|
||||
p.OnServiceUpdate([]*api.Service{{
|
||||
p.OnServiceAdd(&api.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
|
||||
Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{
|
||||
Name: "p",
|
||||
Port: int32(getPortNum(t, svcInfo.socket.Addr().String())),
|
||||
Protocol: "UDP",
|
||||
}}},
|
||||
}})
|
||||
})
|
||||
svcInfo, exists := p.getServiceInfo(servicePortPortalName)
|
||||
if !exists {
|
||||
t.Fatalf("can't find serviceInfo for %s", servicePortPortalName)
|
||||
|
@ -695,14 +723,14 @@ func TestTCPProxyUpdatePort(t *testing.T) {
|
|||
testEchoTCP(t, "127.0.0.1", getPortNum(t, svcInfo.socket.Addr().String()))
|
||||
waitForNumProxyLoops(t, p, 1)
|
||||
|
||||
p.OnServiceUpdate([]*api.Service{{
|
||||
p.OnServiceAdd(&api.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
|
||||
Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{
|
||||
Name: "p",
|
||||
Port: 0,
|
||||
Protocol: "TCP",
|
||||
}}},
|
||||
}})
|
||||
})
|
||||
// Wait for the socket to actually get free.
|
||||
if err := waitForClosedPortTCP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil {
|
||||
t.Fatalf(err.Error())
|
||||
|
@ -742,14 +770,14 @@ func TestUDPProxyUpdatePort(t *testing.T) {
|
|||
}
|
||||
waitForNumProxyLoops(t, p, 1)
|
||||
|
||||
p.OnServiceUpdate([]*api.Service{{
|
||||
p.OnServiceAdd(&api.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
|
||||
Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{
|
||||
Name: "p",
|
||||
Port: 0,
|
||||
Protocol: "UDP",
|
||||
}}},
|
||||
}})
|
||||
})
|
||||
// Wait for the socket to actually get free.
|
||||
if err := waitForClosedPortUDP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil {
|
||||
t.Fatalf(err.Error())
|
||||
|
@ -788,7 +816,7 @@ func TestProxyUpdatePublicIPs(t *testing.T) {
|
|||
testEchoTCP(t, "127.0.0.1", getPortNum(t, svcInfo.socket.Addr().String()))
|
||||
waitForNumProxyLoops(t, p, 1)
|
||||
|
||||
p.OnServiceUpdate([]*api.Service{{
|
||||
p.OnServiceAdd(&api.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
|
||||
Spec: api.ServiceSpec{
|
||||
Ports: []api.ServicePort{{
|
||||
|
@ -799,7 +827,7 @@ func TestProxyUpdatePublicIPs(t *testing.T) {
|
|||
ClusterIP: svcInfo.portal.ip,
|
||||
ExternalIPs: []string{"0.0.0.0"},
|
||||
},
|
||||
}})
|
||||
})
|
||||
// Wait for the socket to actually get free.
|
||||
if err := waitForClosedPortTCP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil {
|
||||
t.Fatalf(err.Error())
|
||||
|
@ -841,40 +869,53 @@ func TestProxyUpdatePortal(t *testing.T) {
|
|||
testEchoTCP(t, "127.0.0.1", getPortNum(t, svcInfo.socket.Addr().String()))
|
||||
waitForNumProxyLoops(t, p, 1)
|
||||
|
||||
p.OnServiceUpdate([]*api.Service{{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
|
||||
Spec: api.ServiceSpec{ClusterIP: "", Ports: []api.ServicePort{{
|
||||
Name: "p",
|
||||
Port: int32(svcInfo.portal.port),
|
||||
Protocol: "TCP",
|
||||
}}},
|
||||
}})
|
||||
_, exists := p.getServiceInfo(servicePortPortalName)
|
||||
if exists {
|
||||
t.Fatalf("service with empty ClusterIP should not be included in the proxy")
|
||||
}
|
||||
|
||||
p.OnServiceUpdate([]*api.Service{{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
|
||||
Spec: api.ServiceSpec{ClusterIP: "None", Ports: []api.ServicePort{{
|
||||
Name: "p",
|
||||
Port: int32(getPortNum(t, svcInfo.socket.Addr().String())),
|
||||
Protocol: "TCP",
|
||||
}}},
|
||||
}})
|
||||
_, exists = p.getServiceInfo(servicePortPortalName)
|
||||
if exists {
|
||||
t.Fatalf("service with 'None' as ClusterIP should not be included in the proxy")
|
||||
}
|
||||
|
||||
p.OnServiceUpdate([]*api.Service{{
|
||||
svcv0 := &api.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
|
||||
Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{
|
||||
Name: "p",
|
||||
Port: int32(svcInfo.portal.port),
|
||||
Protocol: "TCP",
|
||||
}}},
|
||||
}})
|
||||
}
|
||||
|
||||
svcv1 := &api.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
|
||||
Spec: api.ServiceSpec{ClusterIP: "", Ports: []api.ServicePort{{
|
||||
Name: "p",
|
||||
Port: int32(svcInfo.portal.port),
|
||||
Protocol: "TCP",
|
||||
}}},
|
||||
}
|
||||
|
||||
p.OnServiceUpdate(svcv0, svcv1)
|
||||
_, exists := p.getServiceInfo(servicePortPortalName)
|
||||
if exists {
|
||||
t.Fatalf("service with empty ClusterIP should not be included in the proxy")
|
||||
}
|
||||
|
||||
svcv2 := &api.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
|
||||
Spec: api.ServiceSpec{ClusterIP: "None", Ports: []api.ServicePort{{
|
||||
Name: "p",
|
||||
Port: int32(getPortNum(t, svcInfo.socket.Addr().String())),
|
||||
Protocol: "TCP",
|
||||
}}},
|
||||
}
|
||||
p.OnServiceUpdate(svcv1, svcv2)
|
||||
_, exists = p.getServiceInfo(servicePortPortalName)
|
||||
if exists {
|
||||
t.Fatalf("service with 'None' as ClusterIP should not be included in the proxy")
|
||||
}
|
||||
|
||||
svcv3 := &api.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
|
||||
Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{
|
||||
Name: "p",
|
||||
Port: int32(svcInfo.portal.port),
|
||||
Protocol: "TCP",
|
||||
}}},
|
||||
}
|
||||
p.OnServiceUpdate(svcv2, svcv3)
|
||||
lb.OnEndpointsAdd(endpoint)
|
||||
svcInfo, exists = p.getServiceInfo(servicePortPortalName)
|
||||
if !exists {
|
||||
|
|
Loading…
Reference in New Issue