Merge pull request #4580 from thockin/plural_20_endpoints

Part 2 of plural ports: make endpoints a struct
pull/6/head
Brendan Burns 2015-02-20 15:42:19 -08:00
commit 9829128a75
33 changed files with 584 additions and 132 deletions

View File

@ -17,6 +17,7 @@ limitations under the License.
package testing
import (
"fmt"
"math/rand"
"strconv"
"testing"
@ -233,6 +234,11 @@ func FuzzerFor(t *testing.T, version string, src rand.Source) *fuzz.Fuzzer {
s.Type = api.SecretTypeOpaque
},
func(ep *api.Endpoint, c fuzz.Continue) {
// TODO: If our API used a particular type for IP fields we could just catch that here.
ep.IP = fmt.Sprintf("%d.%d.%d.%d", c.Rand.Intn(256), c.Rand.Intn(256), c.Rand.Intn(256), c.Rand.Intn(256))
ep.Port = c.Rand.Intn(65536)
},
)
return f
}

View File

@ -745,12 +745,25 @@ type Service struct {
}
// Endpoints is a collection of endpoints that implement the actual service, for example:
// Name: "mysql", Endpoints: ["10.10.1.1:1909", "10.10.2.2:8834"]
// Name: "mysql", Endpoints: [{"ip": "10.10.1.1", "port": 1909}, {"ip": "10.10.2.2", "port": 8834}]
type Endpoints struct {
TypeMeta `json:",inline"`
ObjectMeta `json:"metadata,omitempty"`
Endpoints []string `json:"endpoints,omitempty"`
// Optional: The IP protocol for these endpoints. Supports "TCP" and
// "UDP". Defaults to "TCP".
Protocol Protocol `json:"protocol,omitempty"`
Endpoints []Endpoint `json:"endpoints,omitempty"`
}
// Endpoint is a single IP endpoint of a service.
type Endpoint struct {
// Required: The IP of this endpoint.
// TODO: This should allow hostname or IP, see #4447.
IP string `json:"ip"`
// Required: The destination port to access.
Port int `json:"port"`
}
// EndpointsList is a list of endpoints.

View File

@ -18,6 +18,7 @@ package v1beta1
import (
"fmt"
"net"
"strconv"
newer "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
@ -1113,6 +1114,48 @@ func init() {
out.TimeoutSeconds = in.TimeoutSeconds
return nil
},
func(in *newer.Endpoints, out *Endpoints, s conversion.Scope) error {
if err := s.Convert(&in.TypeMeta, &out.TypeMeta, 0); err != nil {
return err
}
if err := s.Convert(&in.ObjectMeta, &out.TypeMeta, 0); err != nil {
return err
}
if err := s.Convert(&in.Protocol, &out.Protocol, 0); err != nil {
return err
}
for i := range in.Endpoints {
ep := &in.Endpoints[i]
out.Endpoints = append(out.Endpoints, net.JoinHostPort(ep.IP, strconv.Itoa(ep.Port)))
}
return nil
},
func(in *Endpoints, out *newer.Endpoints, s conversion.Scope) error {
if err := s.Convert(&in.TypeMeta, &out.TypeMeta, 0); err != nil {
return err
}
if err := s.Convert(&in.TypeMeta, &out.ObjectMeta, 0); err != nil {
return err
}
if err := s.Convert(&in.Protocol, &out.Protocol, 0); err != nil {
return err
}
for i := range in.Endpoints {
out.Endpoints = append(out.Endpoints, newer.Endpoint{})
ep := &out.Endpoints[i]
host, port, err := net.SplitHostPort(in.Endpoints[i])
if err != nil {
return err
}
ep.IP = host
pn, err := strconv.Atoi(port)
if err != nil {
return err
}
ep.Port = pn
}
return nil
},
)
if err != nil {
// If one of the conversion functions is malformed, detect it immediately.

View File

@ -382,3 +382,72 @@ func TestContainerConversion(t *testing.T) {
}
}
}
func TestEndpointsConversion(t *testing.T) {
testCases := []struct {
given current.Endpoints
expected newer.Endpoints
}{
{
given: current.Endpoints{
TypeMeta: current.TypeMeta{
ID: "empty",
},
Protocol: current.ProtocolTCP,
Endpoints: []string{},
},
expected: newer.Endpoints{
Protocol: newer.ProtocolTCP,
Endpoints: []newer.Endpoint{},
},
},
{
given: current.Endpoints{
TypeMeta: current.TypeMeta{
ID: "one",
},
Protocol: current.ProtocolTCP,
Endpoints: []string{"1.2.3.4:88"},
},
expected: newer.Endpoints{
Protocol: newer.ProtocolTCP,
Endpoints: []newer.Endpoint{{IP: "1.2.3.4", Port: 88}},
},
},
{
given: current.Endpoints{
TypeMeta: current.TypeMeta{
ID: "several",
},
Protocol: current.ProtocolUDP,
Endpoints: []string{"1.2.3.4:88", "1.2.3.4:89", "1.2.3.4:90"},
},
expected: newer.Endpoints{
Protocol: newer.ProtocolUDP,
Endpoints: []newer.Endpoint{{IP: "1.2.3.4", Port: 88}, {IP: "1.2.3.4", Port: 89}, {IP: "1.2.3.4", Port: 90}},
},
},
}
for i, tc := range testCases {
// Convert versioned -> internal.
got := newer.Endpoints{}
if err := Convert(&tc.given, &got); err != nil {
t.Errorf("[Case: %d] Unexpected error: %v", i, err)
continue
}
if got.Protocol != tc.expected.Protocol || !newer.Semantic.DeepEqual(got.Endpoints, tc.expected.Endpoints) {
t.Errorf("[Case: %d] Expected %v, got %v", i, tc.expected, got)
}
// Convert internal -> versioned.
got2 := current.Endpoints{}
if err := Convert(&got, &got2); err != nil {
t.Errorf("[Case: %d] Unexpected error: %v", i, err)
continue
}
if got2.Protocol != tc.given.Protocol || !newer.Semantic.DeepEqual(got2.Endpoints, tc.given.Endpoints) {
t.Errorf("[Case: %d] Expected %v, got %v", i, tc.given, got2)
}
}
}

View File

@ -85,5 +85,10 @@ func init() {
obj.Type = SecretTypeOpaque
}
},
func(obj *Endpoints) {
if obj.Protocol == "" {
obj.Protocol = "TCP"
}
},
)
}

View File

@ -102,3 +102,13 @@ func TestSetDefaultSecret(t *testing.T) {
t.Errorf("Expected secret type %v, got %v", current.SecretTypeOpaque, s2.Type)
}
}
func TestSetDefaulEndpointsProtocol(t *testing.T) {
in := &current.Endpoints{}
obj := roundTrip(t, runtime.Object(in))
out := obj.(*current.Endpoints)
if out.Protocol != current.ProtocolTCP {
t.Errorf("Expected protocol %s, got %s", current.ProtocolTCP, out.Protocol)
}
}

View File

@ -605,6 +605,9 @@ type Service struct {
// Name: "mysql", Endpoints: ["10.10.1.1:1909", "10.10.2.2:8834"]
type Endpoints struct {
TypeMeta `json:",inline"`
// Optional: The IP protocol for these endpoints. Supports "TCP" and
// "UDP". Defaults to "TCP".
Protocol Protocol `json:"protocol,omitempty" description:"IP protocol for endpoint ports; must be UDP or TCP; TCP if unspecified"`
Endpoints []string `json:"endpoints,omitempty" description:"list of endpoints corresponding to a service, of the form address:port, such as 10.10.1.1:1909"`
}

View File

@ -18,6 +18,7 @@ package v1beta2
import (
"fmt"
"net"
"strconv"
newer "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
@ -1028,6 +1029,48 @@ func init() {
out.TimeoutSeconds = in.TimeoutSeconds
return nil
},
func(in *newer.Endpoints, out *Endpoints, s conversion.Scope) error {
if err := s.Convert(&in.TypeMeta, &out.TypeMeta, 0); err != nil {
return err
}
if err := s.Convert(&in.ObjectMeta, &out.TypeMeta, 0); err != nil {
return err
}
if err := s.Convert(&in.Protocol, &out.Protocol, 0); err != nil {
return err
}
for i := range in.Endpoints {
ep := &in.Endpoints[i]
out.Endpoints = append(out.Endpoints, net.JoinHostPort(ep.IP, strconv.Itoa(ep.Port)))
}
return nil
},
func(in *Endpoints, out *newer.Endpoints, s conversion.Scope) error {
if err := s.Convert(&in.TypeMeta, &out.TypeMeta, 0); err != nil {
return err
}
if err := s.Convert(&in.TypeMeta, &out.ObjectMeta, 0); err != nil {
return err
}
if err := s.Convert(&in.Protocol, &out.Protocol, 0); err != nil {
return err
}
for i := range in.Endpoints {
out.Endpoints = append(out.Endpoints, newer.Endpoint{})
ep := &out.Endpoints[i]
host, port, err := net.SplitHostPort(in.Endpoints[i])
if err != nil {
return err
}
ep.IP = host
pn, err := strconv.Atoi(port)
if err != nil {
return err
}
ep.Port = pn
}
return nil
},
)
if err != nil {
// If one of the conversion functions is malformed, detect it immediately.

View File

@ -212,3 +212,72 @@ func TestContainerConversion(t *testing.T) {
}
}
}
func TestEndpointsConversion(t *testing.T) {
testCases := []struct {
given current.Endpoints
expected newer.Endpoints
}{
{
given: current.Endpoints{
TypeMeta: current.TypeMeta{
ID: "empty",
},
Protocol: current.ProtocolTCP,
Endpoints: []string{},
},
expected: newer.Endpoints{
Protocol: newer.ProtocolTCP,
Endpoints: []newer.Endpoint{},
},
},
{
given: current.Endpoints{
TypeMeta: current.TypeMeta{
ID: "one",
},
Protocol: current.ProtocolTCP,
Endpoints: []string{"1.2.3.4:88"},
},
expected: newer.Endpoints{
Protocol: newer.ProtocolTCP,
Endpoints: []newer.Endpoint{{IP: "1.2.3.4", Port: 88}},
},
},
{
given: current.Endpoints{
TypeMeta: current.TypeMeta{
ID: "several",
},
Protocol: current.ProtocolUDP,
Endpoints: []string{"1.2.3.4:88", "1.2.3.4:89", "1.2.3.4:90"},
},
expected: newer.Endpoints{
Protocol: newer.ProtocolUDP,
Endpoints: []newer.Endpoint{{IP: "1.2.3.4", Port: 88}, {IP: "1.2.3.4", Port: 89}, {IP: "1.2.3.4", Port: 90}},
},
},
}
for i, tc := range testCases {
// Convert versioned -> internal.
got := newer.Endpoints{}
if err := newer.Scheme.Convert(&tc.given, &got); err != nil {
t.Errorf("[Case: %d] Unexpected error: %v", i, err)
continue
}
if got.Protocol != tc.expected.Protocol || !newer.Semantic.DeepEqual(got.Endpoints, tc.expected.Endpoints) {
t.Errorf("[Case: %d] Expected %v, got %v", i, tc.expected, got)
}
// Convert internal -> versioned.
got2 := current.Endpoints{}
if err := newer.Scheme.Convert(&got, &got2); err != nil {
t.Errorf("[Case: %d] Unexpected error: %v", i, err)
continue
}
if got2.Protocol != tc.given.Protocol || !newer.Semantic.DeepEqual(got2.Endpoints, tc.given.Endpoints) {
t.Errorf("[Case: %d] Expected %v, got %v", i, tc.given, got2)
}
}
}

View File

@ -87,5 +87,10 @@ func init() {
obj.Type = SecretTypeOpaque
}
},
func(obj *Endpoints) {
if obj.Protocol == "" {
obj.Protocol = "TCP"
}
},
)
}

View File

@ -102,3 +102,13 @@ func TestSetDefaultSecret(t *testing.T) {
t.Errorf("Expected secret type %v, got %v", current.SecretTypeOpaque, s2.Type)
}
}
func TestSetDefaulEndpointsProtocol(t *testing.T) {
in := &current.Endpoints{}
obj := roundTrip(t, runtime.Object(in))
out := obj.(*current.Endpoints)
if out.Protocol != current.ProtocolTCP {
t.Errorf("Expected protocol %s, got %s", current.ProtocolTCP, out.Protocol)
}
}

View File

@ -569,6 +569,9 @@ type Service struct {
// Name: "mysql", Endpoints: ["10.10.1.1:1909", "10.10.2.2:8834"]
type Endpoints struct {
TypeMeta `json:",inline"`
// Optional: The IP protocol for these endpoints. Supports "TCP" and
// "UDP". Defaults to "TCP".
Protocol Protocol `json:"protocol,omitempty" description:"IP protocol for endpoint ports; must be UDP or TCP; TCP if unspecified"`
Endpoints []string `json:"endpoints,omitempty" description:"list of endpoints corresponding to a service, of the form address:port, such as 10.10.1.1:1909"`
}

View File

@ -80,5 +80,10 @@ func init() {
obj.Type = SecretTypeOpaque
}
},
func(obj *Endpoints) {
if obj.Protocol == "" {
obj.Protocol = "TCP"
}
},
)
}

View File

@ -102,3 +102,13 @@ func TestSetDefaultSecret(t *testing.T) {
t.Errorf("Expected secret type %v, got %v", current.SecretTypeOpaque, s2.Type)
}
}
func TestSetDefaulEndpointsProtocol(t *testing.T) {
in := &current.Endpoints{}
obj := roundTrip(t, runtime.Object(in))
out := obj.(*current.Endpoints)
if out.Protocol != current.ProtocolTCP {
t.Errorf("Expected protocol %s, got %s", current.ProtocolTCP, out.Protocol)
}
}

View File

@ -777,13 +777,26 @@ type ServiceList struct {
}
// Endpoints is a collection of endpoints that implement the actual service, for example:
// Name: "mysql", Endpoints: ["10.10.1.1:1909", "10.10.2.2:8834"]
// Name: "mysql", Endpoints: [{"ip": "10.10.1.1", "port": 1909}, {"ip": "10.10.2.2", "port": 8834}]
type Endpoints struct {
TypeMeta `json:",inline"`
ObjectMeta `json:"metadata"`
ObjectMeta `json:"metadata,omitempty"`
// Endpoints is the list of host ports that satisfy the service selector
Endpoints []string `json:"endpoints"`
// Optional: The IP protocol for these endpoints. Supports "TCP" and
// "UDP". Defaults to "TCP".
Protocol Protocol `json:"protocol,omitempty"`
Endpoints []Endpoint `json:"endpoints,omitempty"`
}
// Endpoint is a single IP endpoint of a service.
type Endpoint struct {
// Required: The IP of this endpoint.
// TODO: This should allow hostname or IP, see #4447.
IP string `json:"ip"`
// Required: The destination port to access.
Port int `json:"port"`
}
// EndpointsList is a list of endpoints.

View File

@ -615,7 +615,8 @@ func TestListEndpooints(t *testing.T) {
Items: []api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: "endpoint-1"},
Endpoints: []string{"10.245.1.2:8080", "10.245.1.3:8080"},
Endpoints: []api.Endpoint{
{IP: "10.245.1.2", Port: 8080}, {IP: "10.245.1.3", Port: 8080}},
},
},
},

View File

@ -269,7 +269,7 @@ func (d *ServiceDescriber) Describe(namespace, name string) (string, error) {
fmt.Fprintf(out, "Labels:\t%s\n", formatLabels(service.Labels))
fmt.Fprintf(out, "Selector:\t%s\n", formatLabels(service.Spec.Selector))
fmt.Fprintf(out, "Port:\t%d\n", service.Spec.Port)
fmt.Fprintf(out, "Endpoints:\t%s\n", stringList(endpoints.Endpoints))
fmt.Fprintf(out, "Endpoints:\t%s\n", formatEndpoints(endpoints.Endpoints))
if events != nil {
describeEvents(events, out)
}

View File

@ -22,8 +22,10 @@ import (
"fmt"
"io"
"io/ioutil"
"net"
"reflect"
"sort"
"strconv"
"strings"
"text/tabwriter"
"text/template"
@ -263,10 +265,15 @@ func (h *HumanReadablePrinter) printHeader(columnNames []string, w io.Writer) er
return nil
}
func stringList(list []string) string {
if len(list) == 0 {
func formatEndpoints(endpoints []api.Endpoint) string {
if len(endpoints) == 0 {
return "<empty>"
}
list := []string{}
for i := range endpoints {
ep := &endpoints[i]
list = append(list, net.JoinHostPort(ep.IP, strconv.Itoa(ep.Port)))
}
return strings.Join(list, ",")
}
@ -368,7 +375,7 @@ func printServiceList(list *api.ServiceList, w io.Writer) error {
}
func printEndpoints(endpoint *api.Endpoints, w io.Writer) error {
_, err := fmt.Fprintf(w, "%s\t%s\n", endpoint.Name, stringList(endpoint.Endpoints))
_, err := fmt.Fprintf(w, "%s\t%s\n", endpoint.Name, formatEndpoints(endpoint.Endpoints))
return err
}

View File

@ -452,7 +452,7 @@ func TestPrinters(t *testing.T) {
"pod": &api.Pod{ObjectMeta: om("pod")},
"emptyPodList": &api.PodList{},
"nonEmptyPodList": &api.PodList{Items: []api.Pod{{}}},
"endpoints": &api.Endpoints{Endpoints: []string{"127.0.0.1", "localhost:8080"}},
"endpoints": &api.Endpoints{Endpoints: []api.Endpoint{{IP: "127.0.0.1"}, {IP: "localhost", Port: 8080}}},
}
// map of printer name to set of objects it should fail on.
expectedErrors := map[string]util.StringSet{

View File

@ -18,7 +18,6 @@ package master
import (
"net"
"strconv"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
@ -40,7 +39,7 @@ func (m *Master) serviceWriterLoop(stop chan struct{}) {
if err := m.createMasterServiceIfNeeded("kubernetes", m.serviceReadWriteIP, m.serviceReadWritePort); err != nil {
glog.Errorf("Can't create rw service: %v", err)
}
if err := m.ensureEndpointsContain("kubernetes", net.JoinHostPort(m.publicIP.String(), strconv.Itoa(int(m.publicReadWritePort)))); err != nil {
if err := m.ensureEndpointsContain("kubernetes", m.publicIP, m.publicReadWritePort); err != nil {
glog.Errorf("Can't create rw endpoints: %v", err)
}
}
@ -65,7 +64,7 @@ func (m *Master) roServiceWriterLoop(stop chan struct{}) {
if err := m.createMasterServiceIfNeeded("kubernetes-ro", m.serviceReadOnlyIP, m.serviceReadOnlyPort); err != nil {
glog.Errorf("Can't create ro service: %v", err)
}
if err := m.ensureEndpointsContain("kubernetes-ro", net.JoinHostPort(m.publicIP.String(), strconv.Itoa(int(m.publicReadOnlyPort)))); err != nil {
if err := m.ensureEndpointsContain("kubernetes-ro", m.publicIP, m.publicReadOnlyPort); err != nil {
glog.Errorf("Can't create ro endpoints: %v", err)
}
}
@ -126,26 +125,28 @@ func (m *Master) createMasterServiceIfNeeded(serviceName string, serviceIP net.I
// excess endpoints (as determined by m.masterCount). Extra endpoints could appear
// in the list if, for example, the master starts running on a different machine,
// changing IP addresses.
func (m *Master) ensureEndpointsContain(serviceName string, endpoint string) error {
func (m *Master) ensureEndpointsContain(serviceName string, ip net.IP, port int) error {
ctx := api.NewDefaultContext()
e, err := m.endpointRegistry.GetEndpoints(ctx, serviceName)
if err != nil {
if err != nil || e.Protocol != api.ProtocolTCP {
e = &api.Endpoints{
ObjectMeta: api.ObjectMeta{
Name: serviceName,
Namespace: api.NamespaceDefault,
},
Protocol: api.ProtocolTCP,
}
}
found := false
for i := range e.Endpoints {
if e.Endpoints[i] == endpoint {
ep := &e.Endpoints[i]
if ep.IP == ip.String() && ep.Port == port {
found = true
break
}
}
if !found {
e.Endpoints = append(e.Endpoints, endpoint)
e.Endpoints = append(e.Endpoints, api.Endpoint{IP: ip.String(), Port: port})
}
if len(e.Endpoints) > m.masterCount {
// We append to the end and remove from the beginning, so this should

View File

@ -183,7 +183,10 @@ func TestServicesFromZeroError(t *testing.T) {
}
func TestEndpoints(t *testing.T) {
endpoint := api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "bar", ResourceVersion: "2"}, Endpoints: []string{"127.0.0.1:9000"}}
endpoint := api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "bar", ResourceVersion: "2"},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: 9000}},
}
fakeWatch := watch.NewFake()
fakeClient := &client.Fake{Watch: fakeWatch}
@ -230,7 +233,10 @@ func TestEndpoints(t *testing.T) {
}
func TestEndpointsFromZero(t *testing.T) {
endpoint := api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "bar", ResourceVersion: "2"}, Endpoints: []string{"127.0.0.1:9000"}}
endpoint := api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "bar", ResourceVersion: "2"},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: 9000}},
}
fakeWatch := watch.NewFake()
fakeWatch.Stop()

View File

@ -218,11 +218,11 @@ func TestNewMultipleSourcesEndpointsMultipleHandlersAddedAndNotified(t *testing.
config.RegisterHandler(handler2)
endpointsUpdate1 := CreateEndpointsUpdate(ADD, api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Endpoints: []string{"endpoint1", "endpoint2"},
Endpoints: []api.Endpoint{{IP: "endpoint1"}, {IP: "endpoint2"}},
})
endpointsUpdate2 := CreateEndpointsUpdate(ADD, api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "bar"},
Endpoints: []string{"endpoint3", "endpoint4"},
Endpoints: []api.Endpoint{{IP: "endpoint3"}, {IP: "endpoint4"}},
})
handler.Wait(2)
handler2.Wait(2)
@ -244,11 +244,11 @@ func TestNewMultipleSourcesEndpointsMultipleHandlersAddRemoveSetAndNotified(t *t
config.RegisterHandler(handler2)
endpointsUpdate1 := CreateEndpointsUpdate(ADD, api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Endpoints: []string{"endpoint1", "endpoint2"},
Endpoints: []api.Endpoint{{IP: "endpoint1"}, {IP: "endpoint2"}},
})
endpointsUpdate2 := CreateEndpointsUpdate(ADD, api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "bar"},
Endpoints: []string{"endpoint3", "endpoint4"},
Endpoints: []api.Endpoint{{IP: "endpoint3"}, {IP: "endpoint4"}},
})
handler.Wait(2)
handler2.Wait(2)
@ -262,7 +262,7 @@ func TestNewMultipleSourcesEndpointsMultipleHandlersAddRemoveSetAndNotified(t *t
// Add one more
endpointsUpdate3 := CreateEndpointsUpdate(ADD, api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foobar"},
Endpoints: []string{"endpoint5", "endpoint6"},
Endpoints: []api.Endpoint{{IP: "endpoint5"}, {IP: "endpoint6"}},
})
handler.Wait(1)
handler2.Wait(1)
@ -274,7 +274,7 @@ func TestNewMultipleSourcesEndpointsMultipleHandlersAddRemoveSetAndNotified(t *t
// Update the "foo" service with new endpoints
endpointsUpdate1 = CreateEndpointsUpdate(ADD, api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Endpoints: []string{"endpoint77"},
Endpoints: []api.Endpoint{{IP: "endpoint7"}},
})
handler.Wait(1)
handler2.Wait(1)

View File

@ -23,6 +23,7 @@ import (
"net/http"
"net/http/httptest"
"net/url"
"strconv"
"sync/atomic"
"testing"
"time"
@ -102,8 +103,8 @@ func (fake *fakeIptables) IsIpv6() bool {
return false
}
var tcpServerPort string
var udpServerPort string
var tcpServerPort int
var udpServerPort int
func init() {
// Don't handle panics
@ -118,20 +119,28 @@ func init() {
if err != nil {
panic(fmt.Sprintf("failed to parse: %v", err))
}
_, tcpServerPort, err = net.SplitHostPort(u.Host)
_, port, err := net.SplitHostPort(u.Host)
if err != nil {
panic(fmt.Sprintf("failed to parse: %v", err))
}
tcpServerPort, err = strconv.Atoi(port)
if err != nil {
panic(fmt.Sprintf("failed to atoi(%s): %v", port, err))
}
// UDP setup.
udp, err := newUDPEchoServer()
if err != nil {
panic(fmt.Sprintf("failed to make a UDP server: %v", err))
}
_, udpServerPort, err = net.SplitHostPort(udp.LocalAddr().String())
_, port, err = net.SplitHostPort(udp.LocalAddr().String())
if err != nil {
panic(fmt.Sprintf("failed to parse: %v", err))
}
udpServerPort, err = strconv.Atoi(port)
if err != nil {
panic(fmt.Sprintf("failed to atoi(%s): %v", port, err))
}
go udp.Loop()
}
@ -188,7 +197,7 @@ func TestTCPProxy(t *testing.T) {
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: "echo"},
Endpoints: []string{net.JoinHostPort("127.0.0.1", tcpServerPort)},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: tcpServerPort}},
},
})
@ -208,7 +217,7 @@ func TestUDPProxy(t *testing.T) {
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: "echo"},
Endpoints: []string{net.JoinHostPort("127.0.0.1", udpServerPort)},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: udpServerPort}},
},
})
@ -237,7 +246,7 @@ func TestTCPProxyStop(t *testing.T) {
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: "echo"},
Endpoints: []string{net.JoinHostPort("127.0.0.1", tcpServerPort)},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: tcpServerPort}},
},
})
@ -268,7 +277,7 @@ func TestUDPProxyStop(t *testing.T) {
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: "echo"},
Endpoints: []string{net.JoinHostPort("127.0.0.1", udpServerPort)},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: udpServerPort}},
},
})
@ -299,7 +308,7 @@ func TestTCPProxyUpdateDelete(t *testing.T) {
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: "echo"},
Endpoints: []string{net.JoinHostPort("127.0.0.1", tcpServerPort)},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: tcpServerPort}},
},
})
@ -329,7 +338,7 @@ func TestUDPProxyUpdateDelete(t *testing.T) {
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: "echo"},
Endpoints: []string{net.JoinHostPort("127.0.0.1", udpServerPort)},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: udpServerPort}},
},
})
@ -359,7 +368,7 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: "echo"},
Endpoints: []string{net.JoinHostPort("127.0.0.1", tcpServerPort)},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: tcpServerPort}},
},
})
@ -398,7 +407,7 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: "echo"},
Endpoints: []string{net.JoinHostPort("127.0.0.1", udpServerPort)},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: udpServerPort}},
},
})
@ -437,7 +446,7 @@ func TestTCPProxyUpdatePort(t *testing.T) {
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: "echo"},
Endpoints: []string{net.JoinHostPort("127.0.0.1", tcpServerPort)},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: tcpServerPort}},
},
})
@ -473,7 +482,7 @@ func TestUDPProxyUpdatePort(t *testing.T) {
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: "echo"},
Endpoints: []string{net.JoinHostPort("127.0.0.1", udpServerPort)},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: udpServerPort}},
},
})

View File

@ -149,23 +149,18 @@ func (lb *LoadBalancerRR) NextEndpoint(service string, srcAddr net.Addr) (string
return endpoint, nil
}
func isValidEndpoint(spec string) bool {
_, port, err := net.SplitHostPort(spec)
if err != nil {
return false
}
value, err := strconv.Atoi(port)
if err != nil {
return false
}
return value > 0
func isValidEndpoint(ep *api.Endpoint) bool {
return ep.IP != "" && ep.Port > 0
}
func filterValidEndpoints(endpoints []string) []string {
func filterValidEndpoints(endpoints []api.Endpoint) []string {
// Convert Endpoint objects into strings for easier use later. Ignore
// the protocol field - we'll get that from the Service objects.
var result []string
for _, spec := range endpoints {
if isValidEndpoint(spec) {
result = append(result, spec)
for i := range endpoints {
ep := &endpoints[i]
if isValidEndpoint(ep) {
result = append(result, net.JoinHostPort(ep.IP, strconv.Itoa(ep.Port)))
}
}
return result

View File

@ -24,22 +24,28 @@ import (
)
func TestValidateWorks(t *testing.T) {
if isValidEndpoint("") {
if isValidEndpoint(&api.Endpoint{}) {
t.Errorf("Didn't fail for empty string")
}
if isValidEndpoint("foobar") {
if isValidEndpoint(&api.Endpoint{IP: "foobar"}) {
t.Errorf("Didn't fail with no port")
}
if isValidEndpoint("foobar:-1") {
if isValidEndpoint(&api.Endpoint{IP: "foobar", Port: -1}) {
t.Errorf("Didn't fail with a negative port")
}
if !isValidEndpoint("foobar:8080") {
if !isValidEndpoint(&api.Endpoint{IP: "foobar", Port: 8080}) {
t.Errorf("Failed a valid config.")
}
}
func TestFilterWorks(t *testing.T) {
endpoints := []string{"foobar:1", "foobar:2", "foobar:-1", "foobar:3", "foobar:-2"}
endpoints := []api.Endpoint{
{IP: "foobar", Port: 1},
{IP: "foobar", Port: 2},
{IP: "foobar", Port: -1},
{IP: "foobar", Port: 3},
{IP: "foobar", Port: -2},
}
filtered := filterValidEndpoints(endpoints)
if len(filtered) != 3 {
@ -88,7 +94,7 @@ func TestLoadBalanceWorksWithSingleEndpoint(t *testing.T) {
endpoints := make([]api.Endpoints, 1)
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Endpoints: []string{"endpoint1:40"},
Endpoints: []api.Endpoint{{IP: "endpoint1", Port: 40}},
}
loadBalancer.OnUpdate(endpoints)
expectEndpoint(t, loadBalancer, "foo", "endpoint1:40", nil)
@ -106,7 +112,11 @@ func TestLoadBalanceWorksWithMultipleEndpoints(t *testing.T) {
endpoints := make([]api.Endpoints, 1)
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Endpoints: []string{"endpoint:1", "endpoint:2", "endpoint:3"},
Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 1},
{IP: "endpoint", Port: 2},
{IP: "endpoint", Port: 3},
},
}
loadBalancer.OnUpdate(endpoints)
shuffledEndpoints := loadBalancer.endpointsMap["foo"]
@ -125,7 +135,11 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
endpoints := make([]api.Endpoints, 1)
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Endpoints: []string{"endpoint:1", "endpoint:2", "endpoint:3"},
Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 1},
{IP: "endpoint", Port: 2},
{IP: "endpoint", Port: 3},
},
}
loadBalancer.OnUpdate(endpoints)
shuffledEndpoints := loadBalancer.endpointsMap["foo"]
@ -137,7 +151,10 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
// Then update the configuration with one fewer endpoints, make sure
// we start in the beginning again
endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"},
Endpoints: []string{"endpoint:8", "endpoint:9"},
Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 8},
{IP: "endpoint", Port: 9},
},
}
loadBalancer.OnUpdate(endpoints)
shuffledEndpoints = loadBalancer.endpointsMap["foo"]
@ -146,7 +163,7 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], nil)
// Clear endpoints
endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []string{}}
endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{}}
loadBalancer.OnUpdate(endpoints)
endpoint, err = loadBalancer.NextEndpoint("foo", nil)
@ -164,11 +181,18 @@ func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) {
endpoints := make([]api.Endpoints, 2)
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Endpoints: []string{"endpoint:1", "endpoint:2", "endpoint:3"},
Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 1},
{IP: "endpoint", Port: 2},
{IP: "endpoint", Port: 3},
},
}
endpoints[1] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "bar"},
Endpoints: []string{"endpoint:4", "endpoint:5"},
Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 4},
{IP: "endpoint", Port: 5},
},
}
loadBalancer.OnUpdate(endpoints)
shuffledFooEndpoints := loadBalancer.endpointsMap["foo"]
@ -211,7 +235,7 @@ func TestStickyLoadBalanceWorksWithSingleEndpoint(t *testing.T) {
endpoints := make([]api.Endpoints, 1)
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Endpoints: []string{"endpoint:1"},
Endpoints: []api.Endpoint{{IP: "endpoint", Port: 1}},
}
loadBalancer.OnUpdate(endpoints)
expectEndpoint(t, loadBalancer, "foo", "endpoint:1", client1)
@ -234,7 +258,11 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpoints(t *testing.T) {
endpoints := make([]api.Endpoints, 1)
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Endpoints: []string{"endpoint:1", "endpoint:2", "endpoint:3"},
Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 1},
{IP: "endpoint", Port: 2},
{IP: "endpoint", Port: 3},
},
}
loadBalancer.OnUpdate(endpoints)
shuffledEndpoints := loadBalancer.endpointsMap["foo"]
@ -262,7 +290,11 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsStickyNone(t *testing.T) {
endpoints := make([]api.Endpoints, 1)
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Endpoints: []string{"endpoint:1", "endpoint:2", "endpoint:3"},
Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 1},
{IP: "endpoint", Port: 2},
{IP: "endpoint", Port: 3},
},
}
loadBalancer.OnUpdate(endpoints)
shuffledEndpoints := loadBalancer.endpointsMap["foo"]
@ -293,7 +325,11 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) {
endpoints := make([]api.Endpoints, 1)
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Endpoints: []string{"endpoint:1", "endpoint:2", "endpoint:3"},
Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 1},
{IP: "endpoint", Port: 2},
{IP: "endpoint", Port: 3},
},
}
loadBalancer.OnUpdate(endpoints)
shuffledEndpoints := loadBalancer.endpointsMap["foo"]
@ -308,7 +344,10 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) {
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Endpoints: []string{"endpoint:1", "endpoint:2"},
Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 1},
{IP: "endpoint", Port: 2},
},
}
loadBalancer.OnUpdate(endpoints)
shuffledEndpoints = loadBalancer.endpointsMap["foo"]
@ -325,7 +364,11 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) {
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Endpoints: []string{"endpoint:1", "endpoint:2", "endpoint:4"},
Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 1},
{IP: "endpoint", Port: 2},
{IP: "endpoint", Port: 4},
},
}
loadBalancer.OnUpdate(endpoints)
shuffledEndpoints = loadBalancer.endpointsMap["foo"]
@ -351,7 +394,11 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
endpoints := make([]api.Endpoints, 1)
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Endpoints: []string{"endpoint:1", "endpoint:2", "endpoint:3"},
Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 1},
{IP: "endpoint", Port: 2},
{IP: "endpoint", Port: 3},
},
}
loadBalancer.OnUpdate(endpoints)
shuffledEndpoints := loadBalancer.endpointsMap["foo"]
@ -365,7 +412,10 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
// Then update the configuration with one fewer endpoints, make sure
// we start in the beginning again
endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"},
Endpoints: []string{"endpoint:4", "endpoint:5"},
Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 4},
{IP: "endpoint", Port: 5},
},
}
loadBalancer.OnUpdate(endpoints)
shuffledEndpoints = loadBalancer.endpointsMap["foo"]
@ -376,7 +426,7 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2)
// Clear endpoints
endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []string{}}
endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{}}
loadBalancer.OnUpdate(endpoints)
endpoint, err = loadBalancer.NextEndpoint("foo", nil)
@ -398,12 +448,19 @@ func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) {
endpoints := make([]api.Endpoints, 2)
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Endpoints: []string{"endpoint:1", "endpoint:2", "endpoint:3"},
Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 1},
{IP: "endpoint", Port: 2},
{IP: "endpoint", Port: 3},
},
}
loadBalancer.NewService("bar", api.AffinityTypeClientIP, 0)
endpoints[1] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "bar"},
Endpoints: []string{"endpoint:4", "endpoint:5"},
Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 5},
{IP: "endpoint", Port: 5},
},
}
loadBalancer.OnUpdate(endpoints)
shuffledFooEndpoints := loadBalancer.endpointsMap["foo"]

View File

@ -30,7 +30,7 @@ func TestGetEndpoints(t *testing.T) {
registry := &registrytest.ServiceRegistry{
Endpoints: api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Endpoints: []string{"127.0.0.1:9000"},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: 9000}},
},
}
storage := NewREST(registry)
@ -39,7 +39,7 @@ func TestGetEndpoints(t *testing.T) {
if err != nil {
t.Fatalf("unexpected error: %#v", err)
}
if !reflect.DeepEqual([]string{"127.0.0.1:9000"}, obj.(*api.Endpoints).Endpoints) {
if !reflect.DeepEqual([]api.Endpoint{{IP: "127.0.0.1", Port: 9000}}, obj.(*api.Endpoints).Endpoints) {
t.Errorf("unexpected endpoints: %#v", obj)
}
}

View File

@ -598,10 +598,10 @@ func TestEtcdListEndpoints(t *testing.T) {
Node: &etcd.Node{
Nodes: []*etcd.Node{
{
Value: runtime.EncodeOrDie(latest.Codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []string{"127.0.0.1:8345"}}),
Value: runtime.EncodeOrDie(latest.Codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Protocol: "TCP", Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: 8345}}}),
},
{
Value: runtime.EncodeOrDie(latest.Codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "bar"}}),
Value: runtime.EncodeOrDie(latest.Codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "bar"}, Protocol: "TCP"}),
},
},
},
@ -625,7 +625,8 @@ func TestEtcdGetEndpoints(t *testing.T) {
registry := NewTestEtcdRegistry(fakeClient)
endpoints := &api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Endpoints: []string{"127.0.0.1:34855"},
Protocol: "TCP",
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: 34855}},
}
key, _ := makeServiceEndpointsKey(ctx, "foo")
@ -648,7 +649,8 @@ func TestEtcdUpdateEndpoints(t *testing.T) {
registry := NewTestEtcdRegistry(fakeClient)
endpoints := api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Endpoints: []string{"baz", "bar"},
Protocol: "TCP",
Endpoints: []api.Endpoint{{IP: "baz"}, {IP: "bar"}},
}
key, _ := makeServiceEndpointsKey(ctx, "foo")

View File

@ -20,6 +20,7 @@ import (
"fmt"
"math/rand"
"net"
"strconv"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
@ -217,16 +218,17 @@ func (rs *REST) Update(ctx api.Context, obj runtime.Object) (runtime.Object, boo
// ResourceLocation returns a URL to which one can send traffic for the specified service.
func (rs *REST) ResourceLocation(ctx api.Context, id string) (string, error) {
e, err := rs.registry.GetEndpoints(ctx, id)
eps, err := rs.registry.GetEndpoints(ctx, id)
if err != nil {
return "", err
}
if len(e.Endpoints) == 0 {
if len(eps.Endpoints) == 0 {
return "", fmt.Errorf("no endpoints available for %v", id)
}
// We leave off the scheme ('http://') because we have no idea what sort of server
// is listening at this endpoint.
return e.Endpoints[rand.Intn(len(e.Endpoints))], nil
ep := &eps.Endpoints[rand.Intn(len(eps.Endpoints))]
return net.JoinHostPort(ep.IP, strconv.Itoa(ep.Port)), nil
}
func (rs *REST) createExternalLoadBalancer(ctx api.Context, service *api.Service) error {

View File

@ -370,7 +370,7 @@ func TestServiceRegistryGet(t *testing.T) {
func TestServiceRegistryResourceLocation(t *testing.T) {
ctx := api.NewDefaultContext()
registry := registrytest.NewServiceRegistry()
registry.Endpoints = api.Endpoints{Endpoints: []string{"foo:80"}}
registry.Endpoints = api.Endpoints{Endpoints: []api.Endpoint{{IP: "foo", Port: 80}}}
fakeCloud := &cloud.FakeCloud{}
machines := []string{"foo", "bar", "baz"}
storage := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t))

View File

@ -18,8 +18,6 @@ package service
import (
"fmt"
"net"
"strconv"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
@ -64,7 +62,7 @@ func (e *EndpointController) SyncServiceEndpoints() error {
resultErr = err
continue
}
endpoints := []string{}
endpoints := []api.Endpoint{}
for _, pod := range pods.Items {
port, err := findPort(&pod, service.Spec.ContainerPort)
@ -89,7 +87,7 @@ func (e *EndpointController) SyncServiceEndpoints() error {
continue
}
endpoints = append(endpoints, net.JoinHostPort(pod.Status.PodIP, strconv.Itoa(port)))
endpoints = append(endpoints, api.Endpoint{IP: pod.Status.PodIP, Port: port})
}
currentEndpoints, err := e.client.Endpoints(service.Namespace).Get(service.Name)
if err != nil {
@ -98,6 +96,7 @@ func (e *EndpointController) SyncServiceEndpoints() error {
ObjectMeta: api.ObjectMeta{
Name: service.Name,
},
Protocol: service.Spec.Protocol,
}
} else {
glog.Errorf("Error getting endpoints: %v", err)
@ -113,8 +112,8 @@ func (e *EndpointController) SyncServiceEndpoints() error {
_, err = e.client.Endpoints(service.Namespace).Create(newEndpoints)
} else {
// Pre-existing
if endpointsEqual(currentEndpoints, endpoints) {
glog.V(5).Infof("endpoints are equal for %s/%s, skipping update", service.Namespace, service.Name)
if currentEndpoints.Protocol == service.Spec.Protocol && endpointsEqual(currentEndpoints, endpoints) {
glog.V(5).Infof("protocol and endpoints are equal for %s/%s, skipping update", service.Namespace, service.Name)
continue
}
_, err = e.client.Endpoints(service.Namespace).Update(newEndpoints)
@ -127,24 +126,24 @@ func (e *EndpointController) SyncServiceEndpoints() error {
return resultErr
}
func containsEndpoint(endpoints *api.Endpoints, endpoint string) bool {
if endpoints == nil {
func containsEndpoint(haystack *api.Endpoints, needle *api.Endpoint) bool {
if haystack == nil || needle == nil {
return false
}
for ix := range endpoints.Endpoints {
if endpoints.Endpoints[ix] == endpoint {
for ix := range haystack.Endpoints {
if haystack.Endpoints[ix] == *needle {
return true
}
}
return false
}
func endpointsEqual(e *api.Endpoints, endpoints []string) bool {
if len(e.Endpoints) != len(endpoints) {
func endpointsEqual(eps *api.Endpoints, endpoints []api.Endpoint) bool {
if len(eps.Endpoints) != len(endpoints) {
return false
}
for _, endpoint := range endpoints {
if !containsEndpoint(e, endpoint) {
for i := range endpoints {
if !containsEndpoint(eps, &endpoints[i]) {
return false
}
}

View File

@ -245,7 +245,72 @@ func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) {
Name: "foo",
ResourceVersion: "1",
},
Endpoints: []string{"6.7.8.9:1000"},
Protocol: api.ProtocolTCP,
Endpoints: []api.Endpoint{{IP: "6.7.8.9", Port: 1000}},
}})
defer testServer.Close()
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()})
endpoints := NewEndpointController(client)
if err := endpoints.SyncServiceEndpoints(); err != nil {
t.Errorf("unexpected error: %v", err)
}
endpointsHandler.ValidateRequestCount(t, 0)
}
func TestSyncEndpointsProtocolTCP(t *testing.T) {
serviceList := api.ServiceList{
Items: []api.Service{
{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: "other"},
Spec: api.ServiceSpec{
Selector: map[string]string{},
Protocol: api.ProtocolTCP,
},
},
},
}
testServer, endpointsHandler := makeTestServer(t,
serverResponse{http.StatusOK, newPodList(0)},
serverResponse{http.StatusOK, &serviceList},
serverResponse{http.StatusOK, &api.Endpoints{
ObjectMeta: api.ObjectMeta{
Name: "foo",
ResourceVersion: "1",
},
Protocol: api.ProtocolTCP,
Endpoints: []api.Endpoint{{IP: "6.7.8.9", Port: 1000}},
}})
defer testServer.Close()
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()})
endpoints := NewEndpointController(client)
if err := endpoints.SyncServiceEndpoints(); err != nil {
t.Errorf("unexpected error: %v", err)
}
endpointsHandler.ValidateRequestCount(t, 0)
}
func TestSyncEndpointsProtocolUDP(t *testing.T) {
serviceList := api.ServiceList{
Items: []api.Service{
{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: "other"},
Spec: api.ServiceSpec{
Selector: map[string]string{},
Protocol: api.ProtocolUDP,
},
},
},
}
testServer, endpointsHandler := makeTestServer(t,
serverResponse{http.StatusOK, newPodList(0)},
serverResponse{http.StatusOK, &serviceList},
serverResponse{http.StatusOK, &api.Endpoints{
ObjectMeta: api.ObjectMeta{
Name: "foo",
ResourceVersion: "1",
},
Protocol: api.ProtocolUDP,
Endpoints: []api.Endpoint{{IP: "6.7.8.9", Port: 1000}},
}})
defer testServer.Close()
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()})
@ -275,7 +340,8 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) {
Name: "foo",
ResourceVersion: "1",
},
Endpoints: []string{},
Protocol: api.ProtocolTCP,
Endpoints: []api.Endpoint{},
}})
defer testServer.Close()
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()})
@ -288,7 +354,8 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) {
Name: "foo",
ResourceVersion: "1",
},
Endpoints: []string{"1.2.3.4:8080"},
Protocol: api.ProtocolTCP,
Endpoints: []api.Endpoint{{IP: "1.2.3.4", Port: 8080}},
})
endpointsHandler.ValidateRequest(t, "/api/"+testapi.Version()+"/endpoints/foo?namespace=other", "PUT", &data)
}
@ -314,7 +381,8 @@ func TestSyncEndpointsItemsPreexisting(t *testing.T) {
Name: "foo",
ResourceVersion: "1",
},
Endpoints: []string{"6.7.8.9:1000"},
Protocol: api.ProtocolTCP,
Endpoints: []api.Endpoint{{IP: "6.7.8.9", Port: 1000}},
}})
defer testServer.Close()
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()})
@ -327,7 +395,8 @@ func TestSyncEndpointsItemsPreexisting(t *testing.T) {
Name: "foo",
ResourceVersion: "1",
},
Endpoints: []string{"1.2.3.4:8080"},
Protocol: api.ProtocolTCP,
Endpoints: []api.Endpoint{{IP: "1.2.3.4", Port: 8080}},
})
endpointsHandler.ValidateRequest(t, "/api/"+testapi.Version()+"/endpoints/foo?namespace=bar", "PUT", &data)
}
@ -352,7 +421,8 @@ func TestSyncEndpointsItemsPreexistingIdentical(t *testing.T) {
ObjectMeta: api.ObjectMeta{
ResourceVersion: "1",
},
Endpoints: []string{"1.2.3.4:8080"},
Protocol: api.ProtocolTCP,
Endpoints: []api.Endpoint{{IP: "1.2.3.4", Port: 8080}},
}})
defer testServer.Close()
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()})
@ -390,7 +460,8 @@ func TestSyncEndpointsItems(t *testing.T) {
ObjectMeta: api.ObjectMeta{
ResourceVersion: "",
},
Endpoints: []string{"1.2.3.4:8080"},
Protocol: api.ProtocolTCP,
Endpoints: []api.Endpoint{{IP: "1.2.3.4", Port: 8080}},
})
endpointsHandler.ValidateRequest(t, "/api/"+testapi.Version()+"/endpoints?namespace=other", "POST", &data)
}

View File

@ -280,7 +280,7 @@ func TestWatchEtcdState(t *testing.T) {
codec := latest.Codec
type T struct {
Type watch.EventType
Endpoints []string
Endpoints []api.Endpoint
}
testCases := map[string]struct {
Initial map[string]EtcdResponseWithError
@ -294,7 +294,7 @@ func TestWatchEtcdState(t *testing.T) {
{
Action: "create",
Node: &etcd.Node{
Value: string(runtime.EncodeOrDie(codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []string{}})),
Value: string(runtime.EncodeOrDie(codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{}})),
},
},
},
@ -308,12 +308,12 @@ func TestWatchEtcdState(t *testing.T) {
{
Action: "compareAndSwap",
Node: &etcd.Node{
Value: string(runtime.EncodeOrDie(codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []string{"127.0.0.1:9000"}})),
Value: string(runtime.EncodeOrDie(codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: 9000}}})),
CreatedIndex: 1,
ModifiedIndex: 2,
},
PrevNode: &etcd.Node{
Value: string(runtime.EncodeOrDie(codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []string{}})),
Value: string(runtime.EncodeOrDie(codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{}})),
CreatedIndex: 1,
ModifiedIndex: 1,
},
@ -321,7 +321,7 @@ func TestWatchEtcdState(t *testing.T) {
},
From: 1,
Expected: []*T{
{watch.Modified, []string{"127.0.0.1:9000"}},
{watch.Modified, []api.Endpoint{{IP: "127.0.0.1", Port: 9000}}},
},
},
"from initial state": {
@ -330,7 +330,7 @@ func TestWatchEtcdState(t *testing.T) {
R: &etcd.Response{
Action: "get",
Node: &etcd.Node{
Value: string(runtime.EncodeOrDie(codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []string{}})),
Value: string(runtime.EncodeOrDie(codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{}})),
CreatedIndex: 1,
ModifiedIndex: 1,
},
@ -343,12 +343,12 @@ func TestWatchEtcdState(t *testing.T) {
{
Action: "compareAndSwap",
Node: &etcd.Node{
Value: string(runtime.EncodeOrDie(codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []string{"127.0.0.1:9000"}})),
Value: string(runtime.EncodeOrDie(codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: 9000}}})),
CreatedIndex: 1,
ModifiedIndex: 2,
},
PrevNode: &etcd.Node{
Value: string(runtime.EncodeOrDie(codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []string{}})),
Value: string(runtime.EncodeOrDie(codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{}})),
CreatedIndex: 1,
ModifiedIndex: 1,
},
@ -356,7 +356,7 @@ func TestWatchEtcdState(t *testing.T) {
},
Expected: []*T{
{watch.Added, nil},
{watch.Modified, []string{"127.0.0.1:9000"}},
{watch.Modified, []api.Endpoint{{IP: "127.0.0.1", Port: 9000}}},
},
},
}

View File

@ -18,7 +18,6 @@ package e2e
import (
"fmt"
"net"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
@ -207,7 +206,7 @@ var _ = Describe("Services", func() {
}
_, err := c.Services(ns).Create(service)
Expect(err).NotTo(HaveOccurred())
expectedPort := "80"
expectedPort := 80
validateEndpointsOrFail(c, ns, serviceName, expectedPort, []string{})
@ -248,17 +247,13 @@ var _ = Describe("Services", func() {
}, 120.0)
})
func validateIPsOrFail(c *client.Client, ns, expectedPort string, expectedEndpoints []string, endpoints *api.Endpoints) {
func validateIPsOrFail(c *client.Client, ns string, expectedPort int, expectedEndpoints []string, endpoints *api.Endpoints) {
ips := util.StringSet{}
for _, spec := range endpoints.Endpoints {
host, port, err := net.SplitHostPort(spec)
if err != nil {
Fail(fmt.Sprintf("invalid endpoint spec: %s (%v)", spec, err))
for _, ep := range endpoints.Endpoints {
if ep.Port != expectedPort {
Fail(fmt.Sprintf("invalid port, expected %d, got %d", expectedPort, ep.Port))
}
if port != expectedPort {
Fail(fmt.Sprintf("invalid port, expected %s, got %s", expectedPort, port))
}
ips.Insert(host)
ips.Insert(ep.IP)
}
for _, name := range expectedEndpoints {
@ -272,7 +267,7 @@ func validateIPsOrFail(c *client.Client, ns, expectedPort string, expectedEndpoi
}
}
func validateEndpointsOrFail(c *client.Client, ns, serviceName, expectedPort string, expectedEndpoints []string) {
func validateEndpointsOrFail(c *client.Client, ns, serviceName string, expectedPort int, expectedEndpoints []string) {
for {
endpoints, err := c.Endpoints(ns).Get(serviceName)
if err == nil {