Switch winuserspace proxy to be event based for services

pull/6/head
Wojciech Tyczynski 2017-05-05 10:56:03 +02:00
parent 3fbfafdd0a
commit 57d35d5acb
3 changed files with 198 additions and 122 deletions

View File

@ -455,7 +455,7 @@ func NewProxyServer(config *componentconfig.KubeProxyConfiguration, cleanupAndEx
if err != nil { if err != nil {
return nil, fmt.Errorf("unable to create proxier: %v", err) return nil, fmt.Errorf("unable to create proxier: %v", err)
} }
serviceHandler = proxierUserspace serviceEventHandler = proxierUserspace
proxier = proxierUserspace proxier = proxierUserspace
} else { } else {
// This is a proxy.LoadBalancer which NewProxier needs but has methods we don't need for // This is a proxy.LoadBalancer which NewProxier needs but has methods we don't need for

View File

@ -315,86 +315,78 @@ func getListenIPPortMap(service *api.Service, listenPort int, nodePort int) map[
return listenIPPortMap return listenIPPortMap
} }
// OnServiceUpdate manages the active set of service proxies. func (proxier *Proxier) mergeService(service *api.Service) map[ServicePortPortalName]bool {
// Active service proxies are reinitialized if found in the update set or if service == nil {
// shutdown if missing from the update set. return nil
func (proxier *Proxier) OnServiceUpdate(services []*api.Service) { }
glog.V(4).Infof("Received update notice: %+v", services) svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
activeServicePortPortals := make(map[ServicePortPortalName]bool) // use a map as a set if !helper.IsServiceIPSet(service) {
for _, service := range services { glog.V(3).Infof("Skipping service %s due to clusterIP = %q", svcName, service.Spec.ClusterIP)
// if ClusterIP is "None" or empty, skip proxying return nil
if !helper.IsServiceIPSet(service) { }
glog.V(3).Infof("Skipping service %s due to clusterIP = %q", types.NamespacedName{Namespace: service.Namespace, Name: service.Name}, service.Spec.ClusterIP) existingPortPortals := make(map[ServicePortPortalName]bool)
continue
for i := range service.Spec.Ports {
servicePort := &service.Spec.Ports[i]
// create a slice of all the source IPs to use for service port portals
listenIPPortMap := getListenIPPortMap(service, int(servicePort.Port), int(servicePort.NodePort))
protocol := servicePort.Protocol
for listenIP, listenPort := range listenIPPortMap {
servicePortPortalName := ServicePortPortalName{
NamespacedName: svcName,
Port: servicePort.Name,
PortalIPName: listenIP,
}
existingPortPortals[servicePortPortalName] = true
info, exists := proxier.getServiceInfo(servicePortPortalName)
if exists && sameConfig(info, service, protocol, listenPort) {
// Nothing changed.
continue
}
if exists {
glog.V(4).Infof("Something changed for service %q: stopping it", servicePortPortalName)
if err := proxier.closeServicePortPortal(servicePortPortalName, info); err != nil {
glog.Errorf("Failed to close service port portal %q: %v", servicePortPortalName, err)
}
}
glog.V(1).Infof("Adding new service %q at %s:%d/%s", servicePortPortalName, listenIP, listenPort, protocol)
info, err := proxier.addServicePortPortal(servicePortPortalName, protocol, listenIP, listenPort, proxier.udpIdleTimeout)
if err != nil {
glog.Errorf("Failed to start proxy for %q: %v", servicePortPortalName, err)
continue
}
info.sessionAffinityType = service.Spec.SessionAffinity
glog.V(10).Infof("info: %#v", info)
} }
if len(listenIPPortMap) > 0 {
for i := range service.Spec.Ports { // only one loadbalancer per service port portal
servicePort := &service.Spec.Ports[i] servicePortName := proxy.ServicePortName{
// create a slice of all the source IPs to use for service port portals NamespacedName: types.NamespacedName{
listenIPPortMap := getListenIPPortMap(service, int(servicePort.Port), int(servicePort.NodePort)) Namespace: service.Namespace,
protocol := servicePort.Protocol Name: service.Name,
},
for listenIP, listenPort := range listenIPPortMap { Port: servicePort.Name,
servicePortPortalName := ServicePortPortalName{
NamespacedName: types.NamespacedName{
Namespace: service.Namespace,
Name: service.Name,
},
Port: servicePort.Name,
PortalIPName: listenIP,
}
activeServicePortPortals[servicePortPortalName] = true
info, exists := proxier.getServiceInfo(servicePortPortalName)
if exists && sameConfig(info, service, protocol, listenPort) {
// Nothing changed.
continue
}
if exists {
glog.V(4).Infof("Something changed for service %q: stopping it", servicePortPortalName)
if err := proxier.closeServicePortPortal(servicePortPortalName, info); err != nil {
glog.Errorf("Failed to close service port portal %q: %v", servicePortPortalName, err)
}
}
glog.V(1).Infof("Adding new service %q at %s:%d/%s", servicePortPortalName, listenIP, listenPort, protocol)
info, err := proxier.addServicePortPortal(servicePortPortalName, protocol, listenIP, listenPort, proxier.udpIdleTimeout)
if err != nil {
glog.Errorf("Failed to start proxy for %q: %v", servicePortPortalName, err)
continue
}
info.sessionAffinityType = service.Spec.SessionAffinity
glog.V(10).Infof("info: %#v", info)
}
if len(listenIPPortMap) > 0 {
// only one loadbalancer per service port portal
servicePortName := proxy.ServicePortName{
NamespacedName: types.NamespacedName{
Namespace: service.Namespace,
Name: service.Name,
},
Port: servicePort.Name,
}
proxier.loadBalancer.NewService(servicePortName, service.Spec.SessionAffinity, stickyMaxAgeMinutes)
} }
proxier.loadBalancer.NewService(servicePortName, service.Spec.SessionAffinity, stickyMaxAgeMinutes)
} }
} }
for name, info := range proxier.serviceMap { return existingPortPortals
if !activeServicePortPortals[name] { }
glog.V(1).Infof("Stopping service %q", name)
if err := proxier.closeServicePortPortal(name, info); err != nil { func (proxier *Proxier) unmergeService(service *api.Service, existingPortPortals map[ServicePortPortalName]bool) {
glog.Errorf("Failed to close service port portal %q: %v", name, err) if service == nil {
} return
} }
svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
if !helper.IsServiceIPSet(service) {
glog.V(3).Infof("Skipping service %s due to clusterIP = %q", svcName, service.Spec.ClusterIP)
return
} }
proxier.mu.Lock()
defer proxier.mu.Unlock()
// servicePortNameMap tracks all service port portals with the same name/port.
// A value of true means there is one or more service port portals with name/port pair.
servicePortNameMap := make(map[proxy.ServicePortName]bool) servicePortNameMap := make(map[proxy.ServicePortName]bool)
for name := range proxier.serviceMap { for name := range existingPortPortals {
servicePortName := proxy.ServicePortName{ servicePortName := proxy.ServicePortName{
NamespacedName: types.NamespacedName{ NamespacedName: types.NamespacedName{
Namespace: name.Namespace, Namespace: name.Namespace,
@ -402,17 +394,60 @@ func (proxier *Proxier) OnServiceUpdate(services []*api.Service) {
}, },
Port: name.Port, Port: name.Port,
} }
servicePortNameMap[servicePortName] = servicePortNameMap[servicePortName] || activeServicePortPortals[name] servicePortNameMap[servicePortName] = true
} }
// Only delete load balancer if all listen ips per name/port show inactive. for i := range service.Spec.Ports {
for name := range servicePortNameMap { servicePort := &service.Spec.Ports[i]
if !servicePortNameMap[name] { serviceName := proxy.ServicePortName{NamespacedName: svcName, Port: servicePort.Name}
proxier.loadBalancer.DeleteService(name) // create a slice of all the source IPs to use for service port portals
listenIPPortMap := getListenIPPortMap(service, int(servicePort.Port), int(servicePort.NodePort))
for listenIP := range listenIPPortMap {
servicePortPortalName := ServicePortPortalName{
NamespacedName: svcName,
Port: servicePort.Name,
PortalIPName: listenIP,
}
if existingPortPortals[servicePortPortalName] {
continue
}
glog.V(1).Infof("Stopping service %q", servicePortPortalName)
info, exists := proxier.getServiceInfo(servicePortPortalName)
if !exists {
glog.Errorf("Service %q is being removed but doesn't exist", servicePortPortalName)
continue
}
if err := proxier.closeServicePortPortal(servicePortPortalName, info); err != nil {
glog.Errorf("Failed to close service port portal %q: %v", servicePortPortalName, err)
}
}
// Only delete load balancer if all listen ips per name/port show inactive.
if !servicePortNameMap[serviceName] {
proxier.loadBalancer.DeleteService(serviceName)
} }
} }
} }
func (proxier *Proxier) OnServiceAdd(service *api.Service) {
_ = proxier.mergeService(service)
}
func (proxier *Proxier) OnServiceUpdate(oldService, service *api.Service) {
existingPortPortals := proxier.mergeService(service)
proxier.unmergeService(oldService, existingPortPortals)
}
func (proxier *Proxier) OnServiceDelete(service *api.Service) {
proxier.unmergeService(service, map[ServicePortPortalName]bool{})
}
func (proxier *Proxier) OnServiceSynced() {
}
func sameConfig(info *serviceInfo, service *api.Service, protocol api.Protocol, listenPort int) bool { func sameConfig(info *serviceInfo, service *api.Service, protocol api.Protocol, listenPort int) bool {
return info.protocol == protocol && info.portal.port == listenPort && info.sessionAffinityType == service.Spec.SessionAffinity return info.protocol == protocol && info.portal.port == listenPort && info.sessionAffinityType == service.Spec.SessionAffinity
} }

View File

@ -341,7 +341,7 @@ func TestMultiPortProxy(t *testing.T) {
waitForNumProxyLoops(t, p, 2) waitForNumProxyLoops(t, p, 2)
} }
func TestMultiPortOnServiceUpdate(t *testing.T) { func TestMultiPortOnServiceAdd(t *testing.T) {
lb := NewLoadBalancerRR() lb := NewLoadBalancerRR()
serviceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} serviceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
serviceQ := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "q"} serviceQ := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "q"}
@ -354,7 +354,7 @@ func TestMultiPortOnServiceUpdate(t *testing.T) {
} }
waitForNumProxyLoops(t, p, 0) waitForNumProxyLoops(t, p, 0)
p.OnServiceUpdate([]*api.Service{{ p.OnServiceAdd(&api.Service{
ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace},
Spec: api.ServiceSpec{ClusterIP: "0.0.0.0", Ports: []api.ServicePort{{ Spec: api.ServiceSpec{ClusterIP: "0.0.0.0", Ports: []api.ServicePort{{
Name: "p", Name: "p",
@ -365,7 +365,7 @@ func TestMultiPortOnServiceUpdate(t *testing.T) {
Port: 0, Port: 0,
Protocol: "UDP", Protocol: "UDP",
}}}, }}},
}}) })
waitForNumProxyLoops(t, p, 2) waitForNumProxyLoops(t, p, 2)
servicePortPortalNameP := ServicePortPortalName{NamespacedName: types.NamespacedName{Namespace: serviceP.Namespace, Name: serviceP.Name}, Port: serviceP.Port, PortalIPName: listenIP} servicePortPortalNameP := ServicePortPortalName{NamespacedName: types.NamespacedName{Namespace: serviceP.Namespace, Name: serviceP.Name}, Port: serviceP.Port, PortalIPName: listenIP}
@ -515,7 +515,14 @@ func TestTCPProxyUpdateDelete(t *testing.T) {
conn.Close() conn.Close()
waitForNumProxyLoops(t, p, 1) waitForNumProxyLoops(t, p, 1)
p.OnServiceUpdate([]*api.Service{}) p.OnServiceDelete(&api.Service{
ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{
Name: "p",
Port: int32(getPortNum(t, svcInfo.socket.Addr().String())),
Protocol: "TCP",
}}},
})
if err := waitForClosedPortTCP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil { if err := waitForClosedPortTCP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil {
t.Fatalf(err.Error()) t.Fatalf(err.Error())
} }
@ -552,7 +559,14 @@ func TestUDPProxyUpdateDelete(t *testing.T) {
conn.Close() conn.Close()
waitForNumProxyLoops(t, p, 1) waitForNumProxyLoops(t, p, 1)
p.OnServiceUpdate([]*api.Service{}) p.OnServiceDelete(&api.Service{
ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{
Name: "p",
Port: int32(getPortNum(t, svcInfo.socket.Addr().String())),
Protocol: "UDP",
}}},
})
if err := waitForClosedPortUDP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil { if err := waitForClosedPortUDP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil {
t.Fatalf(err.Error()) t.Fatalf(err.Error())
} }
@ -590,7 +604,14 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
conn.Close() conn.Close()
waitForNumProxyLoops(t, p, 1) waitForNumProxyLoops(t, p, 1)
p.OnServiceUpdate([]*api.Service{}) p.OnServiceDelete(&api.Service{
ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{
Name: "p",
Port: int32(getPortNum(t, svcInfo.socket.Addr().String())),
Protocol: "TCP",
}}},
})
if err := waitForClosedPortTCP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil { if err := waitForClosedPortTCP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil {
t.Fatalf(err.Error()) t.Fatalf(err.Error())
} }
@ -598,14 +619,14 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
// need to add endpoint here because it got clean up during service delete // need to add endpoint here because it got clean up during service delete
lb.OnEndpointsAdd(endpoint) lb.OnEndpointsAdd(endpoint)
p.OnServiceUpdate([]*api.Service{{ p.OnServiceAdd(&api.Service{
ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{ Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{
Name: "p", Name: "p",
Port: int32(getPortNum(t, svcInfo.socket.Addr().String())), Port: int32(getPortNum(t, svcInfo.socket.Addr().String())),
Protocol: "TCP", Protocol: "TCP",
}}}, }}},
}}) })
svcInfo, exists := p.getServiceInfo(servicePortPortalName) svcInfo, exists := p.getServiceInfo(servicePortPortalName)
if !exists { if !exists {
t.Fatalf("can't find serviceInfo for %s", servicePortPortalName) t.Fatalf("can't find serviceInfo for %s", servicePortPortalName)
@ -645,7 +666,14 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
conn.Close() conn.Close()
waitForNumProxyLoops(t, p, 1) waitForNumProxyLoops(t, p, 1)
p.OnServiceUpdate([]*api.Service{}) p.OnServiceDelete(&api.Service{
ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{
Name: "p",
Port: int32(getPortNum(t, svcInfo.socket.Addr().String())),
Protocol: "UDP",
}}},
})
if err := waitForClosedPortUDP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil { if err := waitForClosedPortUDP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil {
t.Fatalf(err.Error()) t.Fatalf(err.Error())
} }
@ -653,14 +681,14 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
// need to add endpoint here because it got clean up during service delete // need to add endpoint here because it got clean up during service delete
lb.OnEndpointsAdd(endpoint) lb.OnEndpointsAdd(endpoint)
p.OnServiceUpdate([]*api.Service{{ p.OnServiceAdd(&api.Service{
ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{ Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{
Name: "p", Name: "p",
Port: int32(getPortNum(t, svcInfo.socket.Addr().String())), Port: int32(getPortNum(t, svcInfo.socket.Addr().String())),
Protocol: "UDP", Protocol: "UDP",
}}}, }}},
}}) })
svcInfo, exists := p.getServiceInfo(servicePortPortalName) svcInfo, exists := p.getServiceInfo(servicePortPortalName)
if !exists { if !exists {
t.Fatalf("can't find serviceInfo for %s", servicePortPortalName) t.Fatalf("can't find serviceInfo for %s", servicePortPortalName)
@ -695,14 +723,14 @@ func TestTCPProxyUpdatePort(t *testing.T) {
testEchoTCP(t, "127.0.0.1", getPortNum(t, svcInfo.socket.Addr().String())) testEchoTCP(t, "127.0.0.1", getPortNum(t, svcInfo.socket.Addr().String()))
waitForNumProxyLoops(t, p, 1) waitForNumProxyLoops(t, p, 1)
p.OnServiceUpdate([]*api.Service{{ p.OnServiceAdd(&api.Service{
ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{ Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{
Name: "p", Name: "p",
Port: 0, Port: 0,
Protocol: "TCP", Protocol: "TCP",
}}}, }}},
}}) })
// Wait for the socket to actually get free. // Wait for the socket to actually get free.
if err := waitForClosedPortTCP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil { if err := waitForClosedPortTCP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil {
t.Fatalf(err.Error()) t.Fatalf(err.Error())
@ -742,14 +770,14 @@ func TestUDPProxyUpdatePort(t *testing.T) {
} }
waitForNumProxyLoops(t, p, 1) waitForNumProxyLoops(t, p, 1)
p.OnServiceUpdate([]*api.Service{{ p.OnServiceAdd(&api.Service{
ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{ Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{
Name: "p", Name: "p",
Port: 0, Port: 0,
Protocol: "UDP", Protocol: "UDP",
}}}, }}},
}}) })
// Wait for the socket to actually get free. // Wait for the socket to actually get free.
if err := waitForClosedPortUDP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil { if err := waitForClosedPortUDP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil {
t.Fatalf(err.Error()) t.Fatalf(err.Error())
@ -788,7 +816,7 @@ func TestProxyUpdatePublicIPs(t *testing.T) {
testEchoTCP(t, "127.0.0.1", getPortNum(t, svcInfo.socket.Addr().String())) testEchoTCP(t, "127.0.0.1", getPortNum(t, svcInfo.socket.Addr().String()))
waitForNumProxyLoops(t, p, 1) waitForNumProxyLoops(t, p, 1)
p.OnServiceUpdate([]*api.Service{{ p.OnServiceAdd(&api.Service{
ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Spec: api.ServiceSpec{ Spec: api.ServiceSpec{
Ports: []api.ServicePort{{ Ports: []api.ServicePort{{
@ -799,7 +827,7 @@ func TestProxyUpdatePublicIPs(t *testing.T) {
ClusterIP: svcInfo.portal.ip, ClusterIP: svcInfo.portal.ip,
ExternalIPs: []string{"0.0.0.0"}, ExternalIPs: []string{"0.0.0.0"},
}, },
}}) })
// Wait for the socket to actually get free. // Wait for the socket to actually get free.
if err := waitForClosedPortTCP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil { if err := waitForClosedPortTCP(p, getPortNum(t, svcInfo.socket.Addr().String())); err != nil {
t.Fatalf(err.Error()) t.Fatalf(err.Error())
@ -841,40 +869,53 @@ func TestProxyUpdatePortal(t *testing.T) {
testEchoTCP(t, "127.0.0.1", getPortNum(t, svcInfo.socket.Addr().String())) testEchoTCP(t, "127.0.0.1", getPortNum(t, svcInfo.socket.Addr().String()))
waitForNumProxyLoops(t, p, 1) waitForNumProxyLoops(t, p, 1)
p.OnServiceUpdate([]*api.Service{{ svcv0 := &api.Service{
ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Spec: api.ServiceSpec{ClusterIP: "", Ports: []api.ServicePort{{
Name: "p",
Port: int32(svcInfo.portal.port),
Protocol: "TCP",
}}},
}})
_, exists := p.getServiceInfo(servicePortPortalName)
if exists {
t.Fatalf("service with empty ClusterIP should not be included in the proxy")
}
p.OnServiceUpdate([]*api.Service{{
ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Spec: api.ServiceSpec{ClusterIP: "None", Ports: []api.ServicePort{{
Name: "p",
Port: int32(getPortNum(t, svcInfo.socket.Addr().String())),
Protocol: "TCP",
}}},
}})
_, exists = p.getServiceInfo(servicePortPortalName)
if exists {
t.Fatalf("service with 'None' as ClusterIP should not be included in the proxy")
}
p.OnServiceUpdate([]*api.Service{{
ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{ Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{
Name: "p", Name: "p",
Port: int32(svcInfo.portal.port), Port: int32(svcInfo.portal.port),
Protocol: "TCP", Protocol: "TCP",
}}}, }}},
}}) }
svcv1 := &api.Service{
ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Spec: api.ServiceSpec{ClusterIP: "", Ports: []api.ServicePort{{
Name: "p",
Port: int32(svcInfo.portal.port),
Protocol: "TCP",
}}},
}
p.OnServiceUpdate(svcv0, svcv1)
_, exists := p.getServiceInfo(servicePortPortalName)
if exists {
t.Fatalf("service with empty ClusterIP should not be included in the proxy")
}
svcv2 := &api.Service{
ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Spec: api.ServiceSpec{ClusterIP: "None", Ports: []api.ServicePort{{
Name: "p",
Port: int32(getPortNum(t, svcInfo.socket.Addr().String())),
Protocol: "TCP",
}}},
}
p.OnServiceUpdate(svcv1, svcv2)
_, exists = p.getServiceInfo(servicePortPortalName)
if exists {
t.Fatalf("service with 'None' as ClusterIP should not be included in the proxy")
}
svcv3 := &api.Service{
ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{
Name: "p",
Port: int32(svcInfo.portal.port),
Protocol: "TCP",
}}},
}
p.OnServiceUpdate(svcv2, svcv3)
lb.OnEndpointsAdd(endpoint) lb.OnEndpointsAdd(endpoint)
svcInfo, exists = p.getServiceInfo(servicePortPortalName) svcInfo, exists = p.getServiceInfo(servicePortPortalName)
if !exists { if !exists {