From e9070111119b3f40d779985c3dd4eb192f2859e8 Mon Sep 17 00:00:00 2001 From: Tim Hockin Date: Thu, 18 Sep 2014 16:03:34 -0700 Subject: [PATCH] Core support for ip-per-service --- cluster/gce/config-default.sh | 1 + cluster/gce/config-test.sh | 1 + .../templates/create-dynamic-salt-files.sh | 1 + cluster/gce/util.sh | 1 + cluster/saltbase/salt/apiserver/default | 5 +- cluster/saltbase/salt/kube-proxy/initd | 2 +- cmd/apiserver/apiserver.go | 12 + cmd/integration/integration.go | 28 +- cmd/proxy/proxy.go | 8 +- examples/guestbook-go/README.md | 2 +- examples/guestbook-go/_src/main.go | 2 +- hack/test-cmd.sh | 3 +- pkg/api/types.go | 7 + pkg/api/v1beta1/types.go | 6 + pkg/api/v1beta2/types.go | 6 + pkg/api/v1beta3/types.go | 5 + pkg/kubecfg/resource_printer.go | 6 +- pkg/kubectl/resource_printer.go | 6 +- pkg/master/master.go | 6 +- pkg/proxy/proxier.go | 296 +++++++++++++----- pkg/proxy/proxier_test.go | 150 +++++---- pkg/registry/pod/manifest_factory_test.go | 32 +- pkg/registry/registrytest/service.go | 41 ++- pkg/registry/service/ip_allocator.go | 1 - pkg/registry/service/rest.go | 78 ++++- pkg/registry/service/rest_test.go | 256 +++++++++++++-- 26 files changed, 735 insertions(+), 227 deletions(-) diff --git a/cluster/gce/config-default.sh b/cluster/gce/config-default.sh index 09595df99f..f822acbd4c 100755 --- a/cluster/gce/config-default.sh +++ b/cluster/gce/config-default.sh @@ -31,3 +31,4 @@ MINION_IP_RANGES=($(eval echo "10.244.{1..${NUM_MINIONS}}.0/24")) MINION_SCOPES="compute-rw" # Increase the sleep interval value if concerned about API rate limits. 3, in seconds, is the default. POLL_SLEEP_INTERVAL=3 +PORTAL_NET="10.0.0.0/16" diff --git a/cluster/gce/config-test.sh b/cluster/gce/config-test.sh index 3c9a12c88e..a5cc238979 100755 --- a/cluster/gce/config-test.sh +++ b/cluster/gce/config-test.sh @@ -31,3 +31,4 @@ MINION_IP_RANGES=($(eval echo "10.245.{1..${NUM_MINIONS}}.0/24")) MINION_SCOPES="" # Increase the sleep interval value if concerned about API rate limits. 3, in seconds, is the default. POLL_SLEEP_INTERVAL=3 +PORTAL_NET="10.0.0.0/16" diff --git a/cluster/gce/templates/create-dynamic-salt-files.sh b/cluster/gce/templates/create-dynamic-salt-files.sh index 9255fbf0c2..afea82b88c 100644 --- a/cluster/gce/templates/create-dynamic-salt-files.sh +++ b/cluster/gce/templates/create-dynamic-salt-files.sh @@ -21,6 +21,7 @@ mkdir -p /srv/salt-overlay/pillar cat </srv/salt-overlay/pillar/cluster-params.sls node_instance_prefix: $NODE_INSTANCE_PREFIX +portal_net: $PORTAL_NET EOF mkdir -p /srv/salt-overlay/salt/nginx diff --git a/cluster/gce/util.sh b/cluster/gce/util.sh index 1631820be1..99eed1fda1 100755 --- a/cluster/gce/util.sh +++ b/cluster/gce/util.sh @@ -263,6 +263,7 @@ function kube-up { echo "readonly SERVER_BINARY_TAR_URL='${SERVER_BINARY_TAR_URL}'" echo "readonly SALT_TAR_URL='${SALT_TAR_URL}'" echo "readonly MASTER_HTPASSWD='${htpasswd}'" + echo "readonly PORTAL_NET='${PORTAL_NET}'" grep -v "^#" "${KUBE_ROOT}/cluster/gce/templates/create-dynamic-salt-files.sh" grep -v "^#" "${KUBE_ROOT}/cluster/gce/templates/download-release.sh" grep -v "^#" "${KUBE_ROOT}/cluster/gce/templates/salt-master.sh" diff --git a/cluster/saltbase/salt/apiserver/default b/cluster/saltbase/salt/apiserver/default index d84085fe87..f3a902bbda 100644 --- a/cluster/saltbase/salt/apiserver/default +++ b/cluster/saltbase/salt/apiserver/default @@ -55,5 +55,8 @@ {%- set minion_regexp = "" %} {% endif %} {% endif %} +{% if pillar['portal_net'] is defined %} + {% set portal_net = "-portal_net=" + pillar['portal_net'] %} +{% endif %} -DAEMON_ARGS="{{daemon_args}} {{address}} {{machines}} {{etcd_servers}} {{ minion_regexp }} {{ cloud_provider }} --allow_privileged={{pillar['allow_privileged']}}" +DAEMON_ARGS="{{daemon_args}} {{address}} {{machines}} {{etcd_servers}} {{ minion_regexp }} {{ cloud_provider }} --allow_privileged={{pillar['allow_privileged']}} {{portal_net}}" diff --git a/cluster/saltbase/salt/kube-proxy/initd b/cluster/saltbase/salt/kube-proxy/initd index c1d7cdfc25..9e3e94d7b2 100644 --- a/cluster/saltbase/salt/kube-proxy/initd +++ b/cluster/saltbase/salt/kube-proxy/initd @@ -22,7 +22,7 @@ DAEMON_ARGS="" DAEMON_LOG_FILE=/var/log/$NAME.log PIDFILE=/var/run/$NAME.pid SCRIPTNAME=/etc/init.d/$NAME -DAEMON_USER=kube-proxy +DAEMON_USER=root # Exit if the package is not installed [ -x "$DAEMON" ] || exit 0 diff --git a/cmd/apiserver/apiserver.go b/cmd/apiserver/apiserver.go index a96d39cb98..0aa28abe70 100644 --- a/cmd/apiserver/apiserver.go +++ b/cmd/apiserver/apiserver.go @@ -64,6 +64,7 @@ var ( machineList util.StringList corsAllowedOriginList util.StringList allowPrivileged = flag.Bool("allow_privileged", false, "If true, allow privileged containers.") + portalNet util.IPNet // TODO: make this a list // TODO: Discover these by pinging the host machines, and rip out these flags. nodeMilliCPU = flag.Int("node_milli_cpu", 1000, "The amount of MilliCPU provisioned on each node") nodeMemory = flag.Int("node_memory", 3*1024*1024*1024, "The amount of memory (in bytes) provisioned on each node") @@ -75,6 +76,7 @@ func init() { flag.Var(&etcdServerList, "etcd_servers", "List of etcd servers to watch (http://ip:port), comma separated. Mutually exclusive with -etcd_config") flag.Var(&machineList, "machines", "List of machines to schedule onto, comma separated.") flag.Var(&corsAllowedOriginList, "cors_allowed_origins", "List of allowed origins for CORS, comma separated. An allowed origin can be a regular expression to support subdomain matching. If this list is empty CORS will not be enabled.") + flag.Var(&portalNet, "portal_net", "A CIDR notation IP range from which to assign portal IPs. This must not overlap with any IP ranges assigned to nodes for pods.") } func verifyMinionFlags() { @@ -89,6 +91,13 @@ func verifyMinionFlags() { } } +// TODO: Longer term we should read this from some config store, rather than a flag. +func verifyPortalFlags() { + if portalNet.IP == nil { + glog.Fatal("No -portal_net specified") + } +} + func initCloudProvider(name string, configFilePath string) cloudprovider.Interface { var config *os.File @@ -141,6 +150,7 @@ func main() { verflag.PrintAndExitIfRequested() verifyMinionFlags() + verifyPortalFlags() if (*etcdConfigFile != "" && len(etcdServerList) != 0) || (*etcdConfigFile == "" && len(etcdServerList) == 0) { glog.Fatalf("specify either -etcd_servers or -etcd_config") @@ -172,6 +182,7 @@ func main() { glog.Fatalf("Invalid storage version or misconfigured etcd: %v", err) } + n := net.IPNet(portalNet) m := master.New(&master.Config{ Client: client, Cloud: cloud, @@ -188,6 +199,7 @@ func main() { resources.Memory: util.NewIntOrStringFromInt(*nodeMemory), }, }, + PortalNet: &n, }) mux := http.NewServeMux() diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index 6e56dcf5de..c24384cc27 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -123,11 +123,16 @@ func startComponents(manifestURL string) (apiServerURL string) { } // Master + _, portalNet, err := net.ParseCIDR("10.0.0.0/24") + if err != nil { + glog.Fatalf("Unable to parse CIDR: %v", err) + } m := master.New(&master.Config{ Client: cl, EtcdHelper: helper, Minions: machineList, PodInfoGetter: fakePodInfoGetter{}, + PortalNet: portalNet, }) mux := http.NewServeMux() apiserver.NewAPIGroup(m.API_v1beta1()).InstallREST(mux, "/api/v1beta1") @@ -349,18 +354,33 @@ func runServiceTest(client *client.Client) { if err := wait.Poll(time.Second, time.Second*20, podExists(client, ctx, pod.ID)); err != nil { glog.Fatalf("FAILED: pod never started running %v", err) } - svc := api.Service{ + svc1 := api.Service{ TypeMeta: api.TypeMeta{ID: "service1"}, Selector: map[string]string{ "name": "thisisalonglabel", }, Port: 8080, } - _, err = client.CreateService(ctx, &svc) + _, err = client.CreateService(ctx, &svc1) if err != nil { - glog.Fatalf("Failed to create service: %v, %v", svc, err) + glog.Fatalf("Failed to create service: %v, %v", svc1, err) } - if err := wait.Poll(time.Second, time.Second*10, endpointsSet(client, ctx, svc.ID, 1)); err != nil { + if err := wait.Poll(time.Second, time.Second*20, endpointsSet(client, ctx, svc1.ID, 1)); err != nil { + glog.Fatalf("FAILED: unexpected endpoints: %v", err) + } + // A second service with the same port. + svc2 := api.Service{ + TypeMeta: api.TypeMeta{ID: "service2"}, + Selector: map[string]string{ + "name": "thisisalonglabel", + }, + Port: 8080, + } + _, err = client.CreateService(ctx, &svc2) + if err != nil { + glog.Fatalf("Failed to create service: %v, %v", svc2, err) + } + if err := wait.Poll(time.Second, time.Second*20, endpointsSet(client, ctx, svc2.ID, 1)); err != nil { glog.Fatalf("FAILED: unexpected endpoints: %v", err) } glog.Info("Service test passed.") diff --git a/cmd/proxy/proxy.go b/cmd/proxy/proxy.go index 4945cf5c08..5731262e80 100644 --- a/cmd/proxy/proxy.go +++ b/cmd/proxy/proxy.go @@ -25,6 +25,8 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/proxy" "github.com/GoogleCloudPlatform/kubernetes/pkg/proxy/config" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util/exec" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util/iptables" "github.com/GoogleCloudPlatform/kubernetes/pkg/version/verflag" "github.com/coreos/go-etcd/etcd" "github.com/golang/glog" @@ -40,7 +42,7 @@ var ( func init() { client.BindClientConfigFlags(flag.CommandLine, clientConfig) flag.Var(&etcdServerList, "etcd_servers", "List of etcd servers to watch (http://ip:port), comma separated (optional). Mutually exclusive with -etcd_config") - flag.Var(&bindAddress, "bind_address", "The address for the proxy server to serve on (set to 0.0.0.0 for all interfaces)") + flag.Var(&bindAddress, "bind_address", "The IP address for the proxy server to serve on (set to 0.0.0.0 for all interfaces)") } func main() { @@ -97,12 +99,12 @@ func main() { } loadBalancer := proxy.NewLoadBalancerRR() - proxier := proxy.NewProxier(loadBalancer, net.IP(bindAddress)) + proxier := proxy.NewProxier(loadBalancer, net.IP(bindAddress), iptables.New(exec.New())) // Wire proxier to handle changes to services serviceConfig.RegisterHandler(proxier) // And wire loadBalancer to handle changes to endpoints to services endpointsConfig.RegisterHandler(loadBalancer) // Just loop forever for now... - select {} + proxier.SyncLoop() } diff --git a/examples/guestbook-go/README.md b/examples/guestbook-go/README.md index 2e83713608..63226331a5 100644 --- a/examples/guestbook-go/README.md +++ b/examples/guestbook-go/README.md @@ -85,7 +85,7 @@ redis-slave-controller gurpartap/redis name=redis,role=slave 2 The redis slave configures itself by looking for the Kubernetes service environment variables in the container environment. In particular, the redis slave is started with the following command: ```shell -redis-server --slaveof $SERVICE_HOST $REDIS_MASTER_SERVICE_PORT +redis-server --slaveof $REDIS_MASTER_SERVICE_HOST $REDIS_MASTER_SERVICE_PORT ``` Once that's up you can list the pods in the cluster, to verify that the master and slaves are running: diff --git a/examples/guestbook-go/_src/main.go b/examples/guestbook-go/_src/main.go index 78eaf8e06e..45936fb26d 100644 --- a/examples/guestbook-go/_src/main.go +++ b/examples/guestbook-go/_src/main.go @@ -71,7 +71,7 @@ func HandleError(result interface{}, err error) (r interface{}) { } func main() { - pool = simpleredis.NewConnectionPoolHost(os.Getenv("SERVICE_HOST") + ":" + os.Getenv("REDIS_MASTER_SERVICE_PORT")) + pool = simpleredis.NewConnectionPoolHost(os.Getenv("REDIS_MASTER_SERVICE_HOST") + ":" + os.Getenv("REDIS_MASTER_SERVICE_PORT")) defer pool.Close() r := mux.NewRouter() diff --git a/hack/test-cmd.sh b/hack/test-cmd.sh index ca5e1f5090..d5fb2cd3a7 100755 --- a/hack/test-cmd.sh +++ b/hack/test-cmd.sh @@ -71,7 +71,8 @@ ${GO_OUT}/apiserver \ --port="${API_PORT}" \ --etcd_servers="http://${ETCD_HOST}:${ETCD_PORT}" \ --machines="127.0.0.1" \ - --minion_port=${KUBELET_PORT} 1>&2 & + --minion_port=${KUBELET_PORT} \ + --portal_net="10.0.0.0/24" 1>&2 & APISERVER_PID=$! wait_for_url "http://127.0.0.1:${API_PORT}/healthz" "apiserver: " diff --git a/pkg/api/types.go b/pkg/api/types.go index ad47551e85..5c828b267a 100644 --- a/pkg/api/types.go +++ b/pkg/api/types.go @@ -423,6 +423,13 @@ type Service struct { // ContainerPort is the name of the port on the container to direct traffic to. // Optional, if unspecified use the first port on the container. ContainerPort util.IntOrString `json:"containerPort,omitempty" yaml:"containerPort,omitempty"` + + // PortalIP is assigned by the master. If specified by the user it will be ignored. + // TODO: This is awkward - if we had a BoundService, it would be better factored. + PortalIP string `json:"portalIP,omitempty" yaml:"portalIP,omitempty"` + + // ProxyPort is assigned by the master. If specified by the user it will be ignored. + ProxyPort int `json:"proxyPort,omitempty" yaml:"proxyPort,omitempty"` } // Endpoints is a collection of endpoints that implement the actual service, for example: diff --git a/pkg/api/v1beta1/types.go b/pkg/api/v1beta1/types.go index f8c136e29c..468dde2c1d 100644 --- a/pkg/api/v1beta1/types.go +++ b/pkg/api/v1beta1/types.go @@ -451,6 +451,12 @@ type Service struct { // ContainerPort is the name of the port on the container to direct traffic to. // Optional, if unspecified use the first port on the container. ContainerPort util.IntOrString `json:"containerPort,omitempty" yaml:"containerPort,omitempty"` + + // PortalIP is assigned by the master. If specified by the user it will be ignored. + PortalIP string `json:"portalIP,omitempty" yaml:"portalIP,omitempty"` + + // ProxyPort is assigned by the master. If specified by the user it will be ignored. + ProxyPort int `json:"proxyPort,omitempty" yaml:"proxyPort,omitempty"` } // Endpoints is a collection of endpoints that implement the actual service, for example: diff --git a/pkg/api/v1beta2/types.go b/pkg/api/v1beta2/types.go index 93f2710bd1..6375c0450a 100644 --- a/pkg/api/v1beta2/types.go +++ b/pkg/api/v1beta2/types.go @@ -416,6 +416,12 @@ type Service struct { // ContainerPort is the name of the port on the container to direct traffic to. // Optional, if unspecified use the first port on the container. ContainerPort util.IntOrString `json:"containerPort,omitempty" yaml:"containerPort,omitempty"` + + // PortalIP is assigned by the master. If specified by the user it will be ignored. + PortalIP string `json:"portalIP,omitempty" yaml:"portalIP,omitempty"` + + // ProxyPort is assigned by the master. If specified by the user it will be ignored. + ProxyPort int `json:"proxyPort,omitempty" yaml:"proxyPort,omitempty"` } // Endpoints is a collection of endpoints that implement the actual service, for example: diff --git a/pkg/api/v1beta3/types.go b/pkg/api/v1beta3/types.go index f2b89fbd2c..23016bd7c9 100644 --- a/pkg/api/v1beta3/types.go +++ b/pkg/api/v1beta3/types.go @@ -570,6 +570,11 @@ type ReplicationControllerList struct { // ServiceStatus represents the current status of a service type ServiceStatus struct { + // PortalIP is assigned by the master. + PortalIP string `json:"portalIP,omitempty" yaml:"portalIP,omitempty"` + + // ProxyPort is assigned by the master. If 0, the proxy will choose an ephemeral port. + ProxyPort int `json:"proxyPort,omitempty" yaml:"proxyPort,omitempty"` } // ServiceSpec describes the attributes that a user creates on a service diff --git a/pkg/kubecfg/resource_printer.go b/pkg/kubecfg/resource_printer.go index f54532a3f9..e528f82177 100644 --- a/pkg/kubecfg/resource_printer.go +++ b/pkg/kubecfg/resource_printer.go @@ -140,7 +140,7 @@ func (h *HumanReadablePrinter) validatePrintHandlerFunc(printFunc reflect.Value) var podColumns = []string{"ID", "Image(s)", "Host", "Labels", "Status"} var replicationControllerColumns = []string{"ID", "Image(s)", "Selector", "Replicas"} -var serviceColumns = []string{"ID", "Labels", "Selector", "Port"} +var serviceColumns = []string{"ID", "Labels", "Selector", "IP", "Port"} var minionColumns = []string{"Minion identifier"} var statusColumns = []string{"Status"} var eventColumns = []string{"Name", "Kind", "Status", "Reason", "Message"} @@ -226,8 +226,8 @@ func printReplicationControllerList(list *api.ReplicationControllerList, w io.Wr } func printService(svc *api.Service, w io.Writer) error { - _, err := fmt.Fprintf(w, "%s\t%s\t%s\t%d\n", svc.ID, labels.Set(svc.Labels), - labels.Set(svc.Selector), svc.Port) + _, err := fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%d\n", svc.ID, labels.Set(svc.Labels), + labels.Set(svc.Selector), svc.PortalIP, svc.Port) return err } diff --git a/pkg/kubectl/resource_printer.go b/pkg/kubectl/resource_printer.go index c865291315..fb5bb3f168 100644 --- a/pkg/kubectl/resource_printer.go +++ b/pkg/kubectl/resource_printer.go @@ -152,7 +152,7 @@ func (h *HumanReadablePrinter) validatePrintHandlerFunc(printFunc reflect.Value) var podColumns = []string{"ID", "IMAGE(S)", "HOST", "LABELS", "STATUS"} var replicationControllerColumns = []string{"ID", "IMAGE(S)", "SELECTOR", "REPLICAS"} -var serviceColumns = []string{"ID", "LABELS", "SELECTOR", "PORT"} +var serviceColumns = []string{"ID", "LABELS", "SELECTOR", "IP", "PORT"} var minionColumns = []string{"ID"} var statusColumns = []string{"STATUS"} @@ -222,8 +222,8 @@ func printReplicationControllerList(list *api.ReplicationControllerList, w io.Wr } func printService(svc *api.Service, w io.Writer) error { - _, err := fmt.Fprintf(w, "%s\t%s\t%s\t%d\n", svc.ID, labels.Set(svc.Labels), - labels.Set(svc.Selector), svc.Port) + _, err := fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%d\n", svc.ID, labels.Set(svc.Labels), + labels.Set(svc.Selector), svc.PortalIP, svc.Port) return err } diff --git a/pkg/master/master.go b/pkg/master/master.go index 366e8a3557..37bb8c6915 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -17,6 +17,7 @@ limitations under the License. package master import ( + "net" "net/http" "time" @@ -54,6 +55,7 @@ type Config struct { MinionRegexp string PodInfoGetter client.PodInfoGetter NodeResources api.NodeResources + PortalNet *net.IPNet } // Master contains state for a Kubernetes cluster master/api server. @@ -67,6 +69,7 @@ type Master struct { eventRegistry generic.Registry storage map[string]apiserver.RESTStorage client *client.Client + portalNet *net.IPNet } // NewEtcdHelper returns an EtcdHelper for the provided arguments or an error if the version @@ -98,6 +101,7 @@ func New(c *Config) *Master { eventRegistry: event.NewEtcdRegistry(c.EtcdHelper, uint64(c.EventTTL.Seconds())), minionRegistry: minionRegistry, client: c.Client, + portalNet: c.PortalNet, } m.init(c) return m @@ -137,7 +141,7 @@ func (m *Master) init(c *Config) { Minions: m.client, }), "replicationControllers": controller.NewREST(m.controllerRegistry, m.podRegistry), - "services": service.NewREST(m.serviceRegistry, c.Cloud, m.minionRegistry), + "services": service.NewREST(m.serviceRegistry, c.Cloud, m.minionRegistry, m.portalNet), "endpoints": endpoint.NewREST(m.endpointRegistry), "minions": minion.NewREST(m.minionRegistry), "events": event.NewREST(m.eventRegistry), diff --git a/pkg/proxy/proxier.go b/pkg/proxy/proxier.go index 720992dc1e..539691a097 100644 --- a/pkg/proxy/proxier.go +++ b/pkg/proxy/proxier.go @@ -27,16 +27,19 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util/iptables" "github.com/golang/glog" ) type serviceInfo struct { - port int - protocol api.Protocol - socket proxySocket - timeout time.Duration - mu sync.Mutex // protects active - active bool + portalIP net.IP + portalPort int + protocol api.Protocol + proxyPort int + socket proxySocket + timeout time.Duration + mu sync.Mutex // protects active + active bool } func (si *serviceInfo) isActive() bool { @@ -64,7 +67,7 @@ type proxySocket interface { // on the impact of calling Close while sessions are active. Close() error // ProxyLoop proxies incoming connections for the specified service to the service endpoints. - ProxyLoop(service string, proxier *Proxier) + ProxyLoop(service string, info *serviceInfo, proxier *Proxier) } // tcpProxySocket implements proxySocket. Close() is implemented by net.Listener. When Close() is called, @@ -73,12 +76,7 @@ type tcpProxySocket struct { net.Listener } -func (tcp *tcpProxySocket) ProxyLoop(service string, proxier *Proxier) { - info, found := proxier.getServiceInfo(service) - if !found { - glog.Errorf("Failed to find service: %s", service) - return - } +func (tcp *tcpProxySocket) ProxyLoop(service string, info *serviceInfo, proxier *Proxier) { for { if !info.isActive() { break @@ -97,7 +95,7 @@ func (tcp *tcpProxySocket) ProxyLoop(service string, proxier *Proxier) { inConn.Close() continue } - glog.V(3).Infof("Mapped service %s to endpoint %s", service, endpoint) + glog.V(3).Infof("Mapped service %q to endpoint %s", service, endpoint) // TODO: This could spin up a new goroutine to make the outbound connection, // and keep accepting inbound traffic. outConn, err := net.DialTimeout("tcp", endpoint, endpointDialTimeout) @@ -118,22 +116,23 @@ func proxyTCP(in, out *net.TCPConn) { wg.Add(2) glog.V(4).Infof("Creating proxy between %v <-> %v <-> %v <-> %v", in.RemoteAddr(), in.LocalAddr(), out.LocalAddr(), out.RemoteAddr()) - go copyBytes(in, out, &wg) - go copyBytes(out, in, &wg) + go copyBytes("from backend", in, out, &wg) + go copyBytes("to backend", out, in, &wg) wg.Wait() in.Close() out.Close() } -func copyBytes(in, out *net.TCPConn, wg *sync.WaitGroup) { +func copyBytes(direction string, dest, src *net.TCPConn, wg *sync.WaitGroup) { defer wg.Done() - glog.V(4).Infof("Copying from %v <-> %v <-> %v <-> %v", - in.RemoteAddr(), in.LocalAddr(), out.LocalAddr(), out.RemoteAddr()) - if _, err := io.Copy(in, out); err != nil { + glog.V(4).Infof("Copying %s: %s -> %s", direction, src.RemoteAddr(), dest.RemoteAddr()) + n, err := io.Copy(dest, src) + if err != nil { glog.Errorf("I/O error: %v", err) } - in.CloseRead() - out.CloseWrite() + glog.V(4).Infof("Copied %d bytes %s: %s -> %s", n, direction, src.RemoteAddr(), dest.RemoteAddr()) + dest.CloseWrite() + src.CloseRead() } // udpProxySocket implements proxySocket. Close() is implemented by net.UDPConn. When Close() is called, @@ -157,12 +156,7 @@ func newClientCache() *clientCache { return &clientCache{clients: map[string]net.Conn{}} } -func (udp *udpProxySocket) ProxyLoop(service string, proxier *Proxier) { - info, found := proxier.getServiceInfo(service) - if !found { - glog.Errorf("Failed to find service: %s", service) - return - } +func (udp *udpProxySocket) ProxyLoop(service string, info *serviceInfo, proxier *Proxier) { activeClients := newClientCache() var buffer [4096]byte // 4KiB should be enough for most whole-packets for { @@ -220,7 +214,7 @@ func (udp *udpProxySocket) getBackendConn(activeClients *clientCache, cliAddr ne glog.Errorf("Couldn't find an endpoint for %s %v", service, err) return nil, err } - glog.V(4).Infof("Mapped service %s to endpoint %s", service, endpoint) + glog.V(3).Infof("Mapped service %q to endpoint %s", service, endpoint) svrConn, err = net.DialTimeout("udp", endpoint, endpointDialTimeout) if err != nil { // TODO: Try another endpoint? @@ -237,6 +231,7 @@ func (udp *udpProxySocket) getBackendConn(activeClients *clientCache, cliAddr ne } // This function is expected to be called as a goroutine. +// TODO: Track and log bytes copied, like TCP func (udp *udpProxySocket) proxyClient(cliAddr net.Addr, svrConn net.Conn, activeClients *clientCache, timeout time.Duration) { defer svrConn.Close() var buffer [4096]byte @@ -302,19 +297,64 @@ func newProxySocket(protocol api.Protocol, ip net.IP, port int) (proxySocket, er // Proxier is a simple proxy for TCP connections between a localhost:lport // and services that provide the actual implementations. type Proxier struct { - loadBalancer LoadBalancer - mu sync.Mutex // protects serviceMap - serviceMap map[string]*serviceInfo - address net.IP + loadBalancer LoadBalancer + mu sync.Mutex // protects serviceMap + serviceMap map[string]*serviceInfo + listenAddress net.IP + iptables iptables.Interface } -// NewProxier returns a new Proxier given a LoadBalancer and an -// address on which to listen -func NewProxier(loadBalancer LoadBalancer, address net.IP) *Proxier { +// NewProxier returns a new Proxier given a LoadBalancer and an address on +// which to listen. Because of the iptables logic, It is assumed that there +// is only a single Proxier active on a machine. +func NewProxier(loadBalancer LoadBalancer, listenAddress net.IP, iptables iptables.Interface) *Proxier { + glog.Infof("Initializing iptables") + // Set up the iptables foundations we need. + if err := iptablesInit(iptables); err != nil { + glog.Errorf("Failed to initialize iptables: %s", err) + return nil + } + // Flush old iptables rules (since the bound ports will be invalid after a restart). + // When OnUpdate() is first called, the rules will be recreated. + if err := iptablesFlush(iptables); err != nil { + glog.Errorf("Failed to flush iptables: %s", err) + return nil + } return &Proxier{ - loadBalancer: loadBalancer, - serviceMap: make(map[string]*serviceInfo), - address: address, + loadBalancer: loadBalancer, + serviceMap: make(map[string]*serviceInfo), + listenAddress: listenAddress, + iptables: iptables, + } +} + +// The periodic interval for checking the state of things. +const syncInterval = 5 * time.Second + +// SyncLoop runs periodic work. This is expected to run as a goroutine or as the main loop of the app. It does not return. +func (proxier *Proxier) SyncLoop() { + for { + select { + case <-time.After(syncInterval): + glog.V(2).Infof("Periodic sync") + if err := iptablesInit(proxier.iptables); err != nil { + glog.Errorf("Failed to ensure iptables: %s", err) + } + proxier.ensurePortals() + } + } +} + +// Ensure that portals exist for all services. +func (proxier *Proxier) ensurePortals() { + proxier.mu.Lock() + defer proxier.mu.Unlock() + // NB: This does not remove rules that should not be present. + for name, info := range proxier.serviceMap { + err := proxier.openPortal(name, info) + if err != nil { + glog.Errorf("Failed to ensure portal for %q: %s", name, err) + } } } @@ -330,7 +370,6 @@ func (proxier *Proxier) stopProxyInternal(service string, info *serviceInfo) err if !info.setActive(false) { return nil } - glog.V(3).Infof("Removing service: %s", service) delete(proxier.serviceMap, service) return info.socket.Close() } @@ -348,39 +387,40 @@ func (proxier *Proxier) setServiceInfo(service string, info *serviceInfo) { proxier.serviceMap[service] = info } -// addServiceOnUnusedPort starts listening for a new service, returning the -// port it's using. For testing on a system with unknown ports used. The timeout only applies to UDP +// addServiceOnPort starts listening for a new service, returning the serviceInfo. +// Pass proxyPort=0 to allocate a random port. The timeout only applies to UDP // connections, for now. -func (proxier *Proxier) addServiceOnUnusedPort(service string, protocol api.Protocol, timeout time.Duration) (string, error) { - sock, err := newProxySocket(protocol, proxier.address, 0) +func (proxier *Proxier) addServiceOnPort(service string, protocol api.Protocol, proxyPort int, timeout time.Duration) (*serviceInfo, error) { + sock, err := newProxySocket(protocol, proxier.listenAddress, proxyPort) if err != nil { - return "", err + return nil, err } - _, port, err := net.SplitHostPort(sock.Addr().String()) + _, portStr, err := net.SplitHostPort(sock.Addr().String()) if err != nil { - return "", err + sock.Close() + return nil, err } - portNum, err := strconv.Atoi(port) + portNum, err := strconv.Atoi(portStr) if err != nil { - return "", err + sock.Close() + return nil, err } - proxier.setServiceInfo(service, &serviceInfo{ - port: portNum, - protocol: protocol, - active: true, - socket: sock, - timeout: timeout, - }) - proxier.startAccepting(service, sock) - return port, nil -} + si := &serviceInfo{ + proxyPort: portNum, + protocol: protocol, + active: true, + socket: sock, + timeout: timeout, + } + proxier.setServiceInfo(service, si) -func (proxier *Proxier) startAccepting(service string, sock proxySocket) { - glog.V(1).Infof("Listening for %s on %s:%s", service, sock.Addr().Network(), sock.Addr().String()) - go func(service string, proxier *Proxier) { + glog.V(1).Infof("Proxying for service %q on %s port %d", service, protocol, portNum) + go func(service string, info *serviceInfo, proxier *Proxier) { defer util.HandleCrash() - sock.ProxyLoop(service, proxier) - }(service, proxier) + sock.ProxyLoop(service, info, proxier) + }(service, si, proxier) + + return si, nil } // How long we leave idle UDP connections open. @@ -395,39 +435,133 @@ func (proxier *Proxier) OnUpdate(services []api.Service) { for _, service := range services { activeServices.Insert(service.ID) info, exists := proxier.getServiceInfo(service.ID) + serviceIP := net.ParseIP(service.PortalIP) // TODO: check health of the socket? What if ProxyLoop exited? - if exists && info.isActive() && info.port == service.Port { + if exists && info.isActive() && info.portalPort == service.Port && info.portalIP.Equal(serviceIP) { continue } - if exists && info.port != service.Port { - err := proxier.stopProxy(service.ID, info) + if exists && (info.portalPort != service.Port || !info.portalIP.Equal(serviceIP)) { + glog.V(4).Infof("Something changed for service %q: stopping it", service.ID) + err := proxier.closePortal(service.ID, info) if err != nil { - glog.Errorf("error stopping %s: %v", service.ID, err) + glog.Errorf("Failed to close portal for %q: %s", service.ID, err) + } + err = proxier.stopProxy(service.ID, info) + if err != nil { + glog.Errorf("Failed to stop service %q: %s", service.ID, err) } } - glog.V(3).Infof("Adding a new service %s on %s port %d", service.ID, service.Protocol, service.Port) - sock, err := newProxySocket(service.Protocol, proxier.address, service.Port) + glog.V(1).Infof("Adding new service %q at %s:%d/%s (local :%d)", service.ID, serviceIP, service.Port, service.Protocol, service.ProxyPort) + info, err := proxier.addServiceOnPort(service.ID, service.Protocol, service.ProxyPort, udpIdleTimeout) if err != nil { - glog.Errorf("Failed to get a socket for %s: %+v", service.ID, err) + glog.Errorf("Failed to start proxy for %q: %+v", service.ID, err) continue } - proxier.setServiceInfo(service.ID, &serviceInfo{ - port: service.Port, - protocol: service.Protocol, - active: true, - socket: sock, - timeout: udpIdleTimeout, - }) - proxier.startAccepting(service.ID, sock) + info.portalIP = serviceIP + info.portalPort = service.Port + err = proxier.openPortal(service.ID, info) + if err != nil { + glog.Errorf("Failed to open portal for %q: %s", service.ID, err) + } } proxier.mu.Lock() defer proxier.mu.Unlock() for name, info := range proxier.serviceMap { if !activeServices.Has(name) { - err := proxier.stopProxyInternal(name, info) + glog.V(1).Infof("Stopping service %q", name) + err := proxier.closePortal(name, info) if err != nil { - glog.Errorf("error stopping %s: %v", name, err) + glog.Errorf("Failed to close portal for %q: %s", name, err) + } + err = proxier.stopProxyInternal(name, info) + if err != nil { + glog.Errorf("Failed to stop service %q: %s", name, err) } } } } + +func (proxier *Proxier) openPortal(service string, info *serviceInfo) error { + args := iptablesPortalArgs(info.portalIP, info.portalPort, proxier.listenAddress, info.proxyPort, service) + existed, err := proxier.iptables.EnsureRule(iptables.TableNAT, iptablesProxyChain, args...) + if err != nil { + glog.Errorf("Failed to install iptables %s rule for service %q", iptablesProxyChain, service) + return err + } + if !existed { + glog.Infof("Opened iptables portal for service %q on %s:%d", service, info.portalIP, info.portalPort) + } + return nil +} + +func (proxier *Proxier) closePortal(service string, info *serviceInfo) error { + args := iptablesPortalArgs(info.portalIP, info.portalPort, proxier.listenAddress, info.proxyPort, service) + if err := proxier.iptables.DeleteRule(iptables.TableNAT, iptablesProxyChain, args...); err != nil { + glog.Errorf("Failed to delete iptables %s rule for service %q", iptablesProxyChain, service) + return err + } + glog.Infof("Closed iptables portal for service %q", service) + return nil +} + +var iptablesProxyChain iptables.Chain = "KUBE-PROXY" + +// Ensure that the iptables infrastructure we use is set up. This can safely be called periodically. +func iptablesInit(ipt iptables.Interface) error { + // TODO: There is almost certainly room for optimization here. E.g. If + // we knew the portal_net CIDR we could fast-track outbound packets not + // destined for a service. There's probably more, help wanted. + if _, err := ipt.EnsureChain(iptables.TableNAT, iptablesProxyChain); err != nil { + return err + } + if _, err := ipt.EnsureRule(iptables.TableNAT, iptables.ChainPrerouting, "-j", string(iptablesProxyChain)); err != nil { + return err + } + if _, err := ipt.EnsureRule(iptables.TableNAT, iptables.ChainOutput, "-j", string(iptablesProxyChain)); err != nil { + return err + } + return nil +} + +// Flush all of our custom iptables rules. +func iptablesFlush(ipt iptables.Interface) error { + return ipt.FlushChain(iptables.TableNAT, iptablesProxyChain) +} + +// Used below. +var zeroIP = net.ParseIP("0.0.0.0") +var localhostIP = net.ParseIP("127.0.0.1") + +// Build a slice of iptables args for a portal rule. +func iptablesPortalArgs(destIP net.IP, destPort int, proxyIP net.IP, proxyPort int, service string) []string { + args := []string{ + "-m", "comment", + "--comment", service, + "-p", "tcp", + "-d", destIP.String(), + "--dport", fmt.Sprintf("%d", destPort), + } + // This is tricky. If the proxy is bound (see Proxier.listenAddress) + // to 0.0.0.0 ("any interface") or 127.0.0.1, we can use REDIRECT, + // which will bring packets back to the host's loopback interface. If + // the proxy is bound to any other interface, then it is not listening + // on the hosts's loopback, so we have to use DNAT to that specific + // IP. We can not simply use DNAT to 127.0.0.1 in the first case + // because from within a container, 127.0.0.1 is the container's + // loopback interface, not the host's. + // + // Why would anyone bind to an address that is not inclusive of + // localhost? Apparently some cloud environments have their public IP + // exposed as a real network interface AND do not have firewalling. We + // don't want to expose everything out to the world. + // + // Unfortunately, I don't know of any way to listen on some (N > 1) + // interfaces but not ALL interfaces, short of doing it manually, and + // this is simpler than that. + if proxyIP.Equal(zeroIP) || proxyIP.Equal(localhostIP) { + args = append(args, "-j", "REDIRECT", "--to-ports", fmt.Sprintf("%d", proxyPort)) + } else { + args = append(args, "-j", "DNAT", "--to-destination", fmt.Sprintf("%s:%d", proxyIP.String(), proxyPort)) + } + return args +} diff --git a/pkg/proxy/proxier_test.go b/pkg/proxy/proxier_test.go index b3af1a0f14..809d7c5de7 100644 --- a/pkg/proxy/proxier_test.go +++ b/pkg/proxy/proxier_test.go @@ -28,23 +28,28 @@ import ( "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util/iptables" ) -func waitForClosedPortTCP(p *Proxier, proxyPort string) error { +func joinHostPort(host string, port int) string { + return net.JoinHostPort(host, fmt.Sprintf("%d", port)) +} + +func waitForClosedPortTCP(p *Proxier, proxyPort int) error { for i := 0; i < 50; i++ { - conn, err := net.Dial("tcp", net.JoinHostPort("127.0.0.1", proxyPort)) + conn, err := net.Dial("tcp", joinHostPort("", proxyPort)) if err != nil { return nil } conn.Close() time.Sleep(1 * time.Millisecond) } - return fmt.Errorf("port %s still open", proxyPort) + return fmt.Errorf("port %d still open", proxyPort) } -func waitForClosedPortUDP(p *Proxier, proxyPort string) error { +func waitForClosedPortUDP(p *Proxier, proxyPort int) error { for i := 0; i < 50; i++ { - conn, err := net.Dial("udp", net.JoinHostPort("127.0.0.1", proxyPort)) + conn, err := net.Dial("udp", joinHostPort("", proxyPort)) if err != nil { return nil } @@ -66,7 +71,26 @@ func waitForClosedPortUDP(p *Proxier, proxyPort string) error { conn.Close() time.Sleep(1 * time.Millisecond) } - return fmt.Errorf("port %s still open", proxyPort) + return fmt.Errorf("port %d still open", proxyPort) +} + +// The iptables logic has to be tested in a proper end-to-end test, so this just stubs everything out. +type fakeIptables struct{} + +func (fake *fakeIptables) EnsureChain(table iptables.Table, chain iptables.Chain) (bool, error) { + return false, nil +} + +func (fake *fakeIptables) FlushChain(table iptables.Table, chain iptables.Chain) error { + return nil +} + +func (fake *fakeIptables) EnsureRule(table iptables.Table, chain iptables.Chain, args ...string) (bool, error) { + return false, nil +} + +func (fake *fakeIptables) DeleteRule(table iptables.Table, chain iptables.Chain, args ...string) error { + return nil } var tcpServerPort string @@ -99,9 +123,9 @@ func init() { go udp.Loop() } -func testEchoTCP(t *testing.T, address, port string) { +func testEchoTCP(t *testing.T, address string, port int) { path := "aaaaa" - res, err := http.Get("http://" + address + ":" + port + "/" + path) + res, err := http.Get("http://" + address + ":" + fmt.Sprintf("%d", port) + "/" + path) if err != nil { t.Fatalf("error connecting to server: %v", err) } @@ -115,10 +139,10 @@ func testEchoTCP(t *testing.T, address, port string) { } } -func testEchoUDP(t *testing.T, address, port string) { +func testEchoUDP(t *testing.T, address string, port int) { data := "abc123" - conn, err := net.Dial("udp", net.JoinHostPort(address, port)) + conn, err := net.Dial("udp", joinHostPort(address, port)) if err != nil { t.Fatalf("error connecting to server: %v", err) } @@ -144,13 +168,13 @@ func TestTCPProxy(t *testing.T) { }, }) - p := NewProxier(lb, net.ParseIP("127.0.0.1")) + p := NewProxier(lb, net.ParseIP("127.0.0.1"), &fakeIptables{}) - proxyPort, err := p.addServiceOnUnusedPort("echo", "TCP", 0) + svcInfo, err := p.addServiceOnPort("echo", "TCP", 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } - testEchoTCP(t, "127.0.0.1", proxyPort) + testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort) } func TestUDPProxy(t *testing.T) { @@ -162,13 +186,13 @@ func TestUDPProxy(t *testing.T) { }, }) - p := NewProxier(lb, net.ParseIP("127.0.0.1")) + p := NewProxier(lb, net.ParseIP("127.0.0.1"), &fakeIptables{}) - proxyPort, err := p.addServiceOnUnusedPort("echo", "UDP", time.Second) + svcInfo, err := p.addServiceOnPort("echo", "UDP", 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } - testEchoUDP(t, "127.0.0.1", proxyPort) + testEchoUDP(t, "127.0.0.1", svcInfo.proxyPort) } // Helper: Stops the proxy for the named service. @@ -189,13 +213,13 @@ func TestTCPProxyStop(t *testing.T) { }, }) - p := NewProxier(lb, net.ParseIP("127.0.0.1")) + p := NewProxier(lb, net.ParseIP("127.0.0.1"), &fakeIptables{}) - proxyPort, err := p.addServiceOnUnusedPort("echo", "TCP", 0) + svcInfo, err := p.addServiceOnPort("echo", "TCP", 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } - conn, err := net.Dial("tcp", net.JoinHostPort("127.0.0.1", proxyPort)) + conn, err := net.Dial("tcp", joinHostPort("", svcInfo.proxyPort)) if err != nil { t.Fatalf("error connecting to proxy: %v", err) } @@ -203,7 +227,7 @@ func TestTCPProxyStop(t *testing.T) { stopProxyByName(p, "echo") // Wait for the port to really close. - if err := waitForClosedPortTCP(p, proxyPort); err != nil { + if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil { t.Fatalf(err.Error()) } } @@ -217,13 +241,13 @@ func TestUDPProxyStop(t *testing.T) { }, }) - p := NewProxier(lb, net.ParseIP("127.0.0.1")) + p := NewProxier(lb, net.ParseIP("127.0.0.1"), &fakeIptables{}) - proxyPort, err := p.addServiceOnUnusedPort("echo", "UDP", time.Second) + svcInfo, err := p.addServiceOnPort("echo", "UDP", 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } - conn, err := net.Dial("udp", net.JoinHostPort("127.0.0.1", proxyPort)) + conn, err := net.Dial("udp", joinHostPort("", svcInfo.proxyPort)) if err != nil { t.Fatalf("error connecting to proxy: %v", err) } @@ -231,7 +255,7 @@ func TestUDPProxyStop(t *testing.T) { stopProxyByName(p, "echo") // Wait for the port to really close. - if err := waitForClosedPortUDP(p, proxyPort); err != nil { + if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil { t.Fatalf(err.Error()) } } @@ -245,20 +269,20 @@ func TestTCPProxyUpdateDelete(t *testing.T) { }, }) - p := NewProxier(lb, net.ParseIP("127.0.0.1")) + p := NewProxier(lb, net.ParseIP("127.0.0.1"), &fakeIptables{}) - proxyPort, err := p.addServiceOnUnusedPort("echo", "TCP", 0) + svcInfo, err := p.addServiceOnPort("echo", "TCP", 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } - conn, err := net.Dial("tcp", net.JoinHostPort("127.0.0.1", proxyPort)) + conn, err := net.Dial("tcp", joinHostPort("", svcInfo.proxyPort)) if err != nil { t.Fatalf("error connecting to proxy: %v", err) } conn.Close() p.OnUpdate([]api.Service{}) - if err := waitForClosedPortTCP(p, proxyPort); err != nil { + if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil { t.Fatalf(err.Error()) } } @@ -272,20 +296,20 @@ func TestUDPProxyUpdateDelete(t *testing.T) { }, }) - p := NewProxier(lb, net.ParseIP("127.0.0.1")) + p := NewProxier(lb, net.ParseIP("127.0.0.1"), &fakeIptables{}) - proxyPort, err := p.addServiceOnUnusedPort("echo", "UDP", time.Second) + svcInfo, err := p.addServiceOnPort("echo", "UDP", 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } - conn, err := net.Dial("udp", net.JoinHostPort("127.0.0.1", proxyPort)) + conn, err := net.Dial("udp", joinHostPort("", svcInfo.proxyPort)) if err != nil { t.Fatalf("error connecting to proxy: %v", err) } conn.Close() p.OnUpdate([]api.Service{}) - if err := waitForClosedPortUDP(p, proxyPort); err != nil { + if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil { t.Fatalf(err.Error()) } } @@ -299,27 +323,26 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) { }, }) - p := NewProxier(lb, net.ParseIP("127.0.0.1")) + p := NewProxier(lb, net.ParseIP("127.0.0.1"), &fakeIptables{}) - proxyPort, err := p.addServiceOnUnusedPort("echo", "TCP", 0) + svcInfo, err := p.addServiceOnPort("echo", "TCP", 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } - conn, err := net.Dial("tcp", net.JoinHostPort("127.0.0.1", proxyPort)) + conn, err := net.Dial("tcp", joinHostPort("", svcInfo.proxyPort)) if err != nil { t.Fatalf("error connecting to proxy: %v", err) } conn.Close() p.OnUpdate([]api.Service{}) - if err := waitForClosedPortTCP(p, proxyPort); err != nil { + if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil { t.Fatalf(err.Error()) } - proxyPortNum, _ := strconv.Atoi(proxyPort) p.OnUpdate([]api.Service{ - {TypeMeta: api.TypeMeta{ID: "echo"}, Port: proxyPortNum, Protocol: "TCP"}, + {TypeMeta: api.TypeMeta{ID: "echo"}, Port: svcInfo.proxyPort, ProxyPort: svcInfo.proxyPort, Protocol: "TCP"}, }) - testEchoTCP(t, "127.0.0.1", proxyPort) + testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort) } func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { @@ -331,27 +354,26 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { }, }) - p := NewProxier(lb, net.ParseIP("127.0.0.1")) + p := NewProxier(lb, net.ParseIP("127.0.0.1"), &fakeIptables{}) - proxyPort, err := p.addServiceOnUnusedPort("echo", "UDP", time.Second) + svcInfo, err := p.addServiceOnPort("echo", "UDP", 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } - conn, err := net.Dial("udp", net.JoinHostPort("127.0.0.1", proxyPort)) + conn, err := net.Dial("udp", joinHostPort("", svcInfo.proxyPort)) if err != nil { t.Fatalf("error connecting to proxy: %v", err) } conn.Close() p.OnUpdate([]api.Service{}) - if err := waitForClosedPortUDP(p, proxyPort); err != nil { + if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil { t.Fatalf(err.Error()) } - proxyPortNum, _ := strconv.Atoi(proxyPort) p.OnUpdate([]api.Service{ - {TypeMeta: api.TypeMeta{ID: "echo"}, Port: proxyPortNum, Protocol: "UDP"}, + {TypeMeta: api.TypeMeta{ID: "echo"}, Port: svcInfo.proxyPort, ProxyPort: svcInfo.proxyPort, Protocol: "UDP"}, }) - testEchoUDP(t, "127.0.0.1", proxyPort) + testEchoUDP(t, "127.0.0.1", svcInfo.proxyPort) } func TestTCPProxyUpdatePort(t *testing.T) { @@ -363,36 +385,36 @@ func TestTCPProxyUpdatePort(t *testing.T) { }, }) - p := NewProxier(lb, net.ParseIP("127.0.0.1")) + p := NewProxier(lb, net.ParseIP("127.0.0.1"), &fakeIptables{}) - proxyPort, err := p.addServiceOnUnusedPort("echo", "TCP", 0) + svcInfo, err := p.addServiceOnPort("echo", "TCP", 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } // add a new dummy listener in order to get a port that is free l, _ := net.Listen("tcp", ":0") - _, newPort, _ := net.SplitHostPort(l.Addr().String()) - newPortNum, _ := strconv.Atoi(newPort) + _, newPortStr, _ := net.SplitHostPort(l.Addr().String()) + newPort, _ := strconv.Atoi(newPortStr) l.Close() // Wait for the socket to actually get free. if err := waitForClosedPortTCP(p, newPort); err != nil { t.Fatalf(err.Error()) } - if proxyPort == newPort { - t.Errorf("expected difference, got %s %s", newPort, proxyPort) + if svcInfo.proxyPort == newPort { + t.Errorf("expected difference, got %d %d", newPort, svcInfo.proxyPort) } p.OnUpdate([]api.Service{ - {TypeMeta: api.TypeMeta{ID: "echo"}, Port: newPortNum, Protocol: "TCP"}, + {TypeMeta: api.TypeMeta{ID: "echo"}, Port: newPort, ProxyPort: newPort, Protocol: "TCP"}, }) - if err := waitForClosedPortTCP(p, proxyPort); err != nil { + if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil { t.Fatalf(err.Error()) } testEchoTCP(t, "127.0.0.1", newPort) // Ensure the old port is released and re-usable. - l, err = net.Listen("tcp", net.JoinHostPort("", proxyPort)) + l, err = net.Listen("tcp", joinHostPort("", svcInfo.proxyPort)) if err != nil { t.Fatalf("can't claim released port: %s", err) } @@ -408,36 +430,36 @@ func TestUDPProxyUpdatePort(t *testing.T) { }, }) - p := NewProxier(lb, net.ParseIP("127.0.0.1")) + p := NewProxier(lb, net.ParseIP("127.0.0.1"), &fakeIptables{}) - proxyPort, err := p.addServiceOnUnusedPort("echo", "UDP", time.Second) + svcInfo, err := p.addServiceOnPort("echo", "UDP", 0, time.Second) if err != nil { t.Fatalf("error adding new service: %#v", err) } // add a new dummy listener in order to get a port that is free pc, _ := net.ListenPacket("udp", ":0") - _, newPort, _ := net.SplitHostPort(pc.LocalAddr().String()) - newPortNum, _ := strconv.Atoi(newPort) + _, newPortStr, _ := net.SplitHostPort(pc.LocalAddr().String()) + newPort, _ := strconv.Atoi(newPortStr) pc.Close() // Wait for the socket to actually get free. if err := waitForClosedPortUDP(p, newPort); err != nil { t.Fatalf(err.Error()) } - if proxyPort == newPort { - t.Errorf("expected difference, got %s %s", newPort, proxyPort) + if svcInfo.proxyPort == newPort { + t.Errorf("expected difference, got %d %d", newPort, svcInfo.proxyPort) } p.OnUpdate([]api.Service{ - {TypeMeta: api.TypeMeta{ID: "echo"}, Port: newPortNum, Protocol: "UDP"}, + {TypeMeta: api.TypeMeta{ID: "echo"}, Port: newPort, ProxyPort: newPort, Protocol: "UDP"}, }) - if err := waitForClosedPortUDP(p, proxyPort); err != nil { + if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil { t.Fatalf(err.Error()) } testEchoUDP(t, "127.0.0.1", newPort) // Ensure the old port is released and re-usable. - pc, err = net.ListenPacket("udp", net.JoinHostPort("", proxyPort)) + pc, err = net.ListenPacket("udp", joinHostPort("", svcInfo.proxyPort)) if err != nil { t.Fatalf("can't claim released port: %s", err) } diff --git a/pkg/registry/pod/manifest_factory_test.go b/pkg/registry/pod/manifest_factory_test.go index af1ccec3a8..e24d127c29 100644 --- a/pkg/registry/pod/manifest_factory_test.go +++ b/pkg/registry/pod/manifest_factory_test.go @@ -48,10 +48,8 @@ func TestMakeManifestNoServices(t *testing.T) { } container := manifest.Containers[0] - if len(container.Env) != 1 || - container.Env[0].Name != "SERVICE_HOST" || - container.Env[0].Value != "machine" { - t.Errorf("Expected one env vars, got: %#v", manifest) + if len(container.Env) != 0 { + t.Errorf("Expected zero env vars, got: %#v", manifest) } if manifest.ID != "foobar" { t.Errorf("Failed to assign ID to manifest: %#v", manifest.ID) @@ -69,6 +67,7 @@ func TestMakeManifestServices(t *testing.T) { Kind: util.IntstrInt, IntVal: 900, }, + PortalIP: "1.2.3.4", }, }, }, @@ -96,7 +95,7 @@ func TestMakeManifestServices(t *testing.T) { envs := []api.EnvVar{ { Name: "TEST_SERVICE_HOST", - Value: "machine", + Value: "1.2.3.4", }, { Name: "TEST_SERVICE_PORT", @@ -104,11 +103,11 @@ func TestMakeManifestServices(t *testing.T) { }, { Name: "TEST_PORT", - Value: "tcp://machine:8080", + Value: "tcp://1.2.3.4:8080", }, { Name: "TEST_PORT_8080_TCP", - Value: "tcp://machine:8080", + Value: "tcp://1.2.3.4:8080", }, { Name: "TEST_PORT_8080_TCP_PROTO", @@ -120,11 +119,7 @@ func TestMakeManifestServices(t *testing.T) { }, { Name: "TEST_PORT_8080_TCP_ADDR", - Value: "machine", - }, - { - Name: "SERVICE_HOST", - Value: "machine", + Value: "1.2.3.4", }, } if len(container.Env) != len(envs) { @@ -149,6 +144,7 @@ func TestMakeManifestServicesExistingEnvVar(t *testing.T) { Kind: util.IntstrInt, IntVal: 900, }, + PortalIP: "1.2.3.4", }, }, }, @@ -186,7 +182,7 @@ func TestMakeManifestServicesExistingEnvVar(t *testing.T) { }, { Name: "TEST_SERVICE_HOST", - Value: "machine", + Value: "1.2.3.4", }, { Name: "TEST_SERVICE_PORT", @@ -194,11 +190,11 @@ func TestMakeManifestServicesExistingEnvVar(t *testing.T) { }, { Name: "TEST_PORT", - Value: "tcp://machine:8080", + Value: "tcp://1.2.3.4:8080", }, { Name: "TEST_PORT_8080_TCP", - Value: "tcp://machine:8080", + Value: "tcp://1.2.3.4:8080", }, { Name: "TEST_PORT_8080_TCP_PROTO", @@ -210,11 +206,7 @@ func TestMakeManifestServicesExistingEnvVar(t *testing.T) { }, { Name: "TEST_PORT_8080_TCP_ADDR", - Value: "machine", - }, - { - Name: "SERVICE_HOST", - Value: "machine", + Value: "1.2.3.4", }, } if len(container.Env) != len(envs) { diff --git a/pkg/registry/registrytest/service.go b/pkg/registry/registrytest/service.go index 8578d738a2..2871620c75 100644 --- a/pkg/registry/registrytest/service.go +++ b/pkg/registry/registrytest/service.go @@ -17,6 +17,8 @@ limitations under the License. package registrytest import ( + "sync" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" @@ -27,6 +29,7 @@ func NewServiceRegistry() *ServiceRegistry { } type ServiceRegistry struct { + mu sync.Mutex List api.ServiceList Service *api.Service Err error @@ -39,48 +42,84 @@ type ServiceRegistry struct { } func (r *ServiceRegistry) ListServices(ctx api.Context) (*api.ServiceList, error) { - return &r.List, r.Err + r.mu.Lock() + defer r.mu.Unlock() + + // Return by copy to avoid data races + res := new(api.ServiceList) + *res = r.List + res.Items = append([]api.Service{}, r.List.Items...) + return res, r.Err } func (r *ServiceRegistry) CreateService(ctx api.Context, svc *api.Service) error { + r.mu.Lock() + defer r.mu.Unlock() + r.Service = svc r.List.Items = append(r.List.Items, *svc) return r.Err } func (r *ServiceRegistry) GetService(ctx api.Context, id string) (*api.Service, error) { + r.mu.Lock() + defer r.mu.Unlock() + r.GottenID = id return r.Service, r.Err } func (r *ServiceRegistry) DeleteService(ctx api.Context, id string) error { + r.mu.Lock() + defer r.mu.Unlock() + r.DeletedID = id + r.Service = nil return r.Err } func (r *ServiceRegistry) UpdateService(ctx api.Context, svc *api.Service) error { + r.mu.Lock() + defer r.mu.Unlock() + r.UpdatedID = svc.ID + r.Service = svc return r.Err } func (r *ServiceRegistry) WatchServices(ctx api.Context, label labels.Selector, field labels.Selector, resourceVersion string) (watch.Interface, error) { + r.mu.Lock() + defer r.mu.Unlock() + return nil, r.Err } func (r *ServiceRegistry) ListEndpoints(ctx api.Context) (*api.EndpointsList, error) { + r.mu.Lock() + defer r.mu.Unlock() + return &r.EndpointsList, r.Err } func (r *ServiceRegistry) GetEndpoints(ctx api.Context, id string) (*api.Endpoints, error) { + r.mu.Lock() + defer r.mu.Unlock() + r.GottenID = id return &r.Endpoints, r.Err } func (r *ServiceRegistry) UpdateEndpoints(ctx api.Context, e *api.Endpoints) error { + r.mu.Lock() + defer r.mu.Unlock() + r.Endpoints = *e return r.Err } func (r *ServiceRegistry) WatchEndpoints(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) { + r.mu.Lock() + defer r.mu.Unlock() + return nil, r.Err } diff --git a/pkg/registry/service/ip_allocator.go b/pkg/registry/service/ip_allocator.go index e6717a7540..f48511803f 100644 --- a/pkg/registry/service/ip_allocator.go +++ b/pkg/registry/service/ip_allocator.go @@ -32,7 +32,6 @@ type ipAllocator struct { } // newIPAllocator creates and intializes a new ipAllocator object. -// FIXME: resync from storage at startup. func newIPAllocator(subnet *net.IPNet) *ipAllocator { if subnet == nil || subnet.IP == nil || subnet.Mask == nil { return nil diff --git a/pkg/registry/service/rest.go b/pkg/registry/service/rest.go index 3742c31dff..42f26810d5 100644 --- a/pkg/registry/service/rest.go +++ b/pkg/registry/service/rest.go @@ -19,6 +19,7 @@ package service import ( "fmt" "math/rand" + "net" "strconv" "strings" @@ -32,21 +33,49 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" + "github.com/golang/glog" ) // REST adapts a service registry into apiserver's RESTStorage model. type REST struct { - registry Registry - cloud cloudprovider.Interface - machines minion.Registry + registry Registry + cloud cloudprovider.Interface + machines minion.Registry + portalMgr *ipAllocator } // NewREST returns a new REST. -func NewREST(registry Registry, cloud cloudprovider.Interface, machines minion.Registry) *REST { +func NewREST(registry Registry, cloud cloudprovider.Interface, machines minion.Registry, portalNet *net.IPNet) *REST { + // TODO: Before we can replicate masters, this has to be synced (e.g. lives in etcd) + ipa := newIPAllocator(portalNet) + reloadIPsFromStorage(ipa, registry) + return &REST{ - registry: registry, - cloud: cloud, - machines: machines, + registry: registry, + cloud: cloud, + machines: machines, + portalMgr: ipa, + } +} + +// Helper: mark all previously allocated IPs in the allocator. +func reloadIPsFromStorage(ipa *ipAllocator, registry Registry) { + services, err := registry.ListServices(api.NewContext()) + if err != nil { + // This is really bad. + glog.Errorf("can't list services to init service REST: %s", err) + return + } + for i := range services.Items { + s := &services.Items[i] + if s.PortalIP == "" { + glog.Warningf("service %q has no PortalIP", s.ID) + continue + } + if err := ipa.Allocate(net.ParseIP(s.PortalIP)); err != nil { + // This is really bad. + glog.Errorf("service %q PortalIP %s could not be allocated: %s", s.ID, s.PortalIP, err) + } } } @@ -61,9 +90,16 @@ func (rs *REST) Create(ctx api.Context, obj runtime.Object) (<-chan runtime.Obje srv.CreationTimestamp = util.Now() + if ip, err := rs.portalMgr.AllocateNext(); err != nil { + return nil, err + } else { + srv.PortalIP = ip.String() + } + return apiserver.MakeAsync(func() (runtime.Object, error) { // TODO: Consider moving this to a rectification loop, so that we make/remove external load balancers // correctly no matter what http operations happen. + srv.ProxyPort = 0 if srv.CreateExternalLoadBalancer { if rs.cloud == nil { return nil, fmt.Errorf("requested an external service, but no cloud provider supplied.") @@ -88,6 +124,9 @@ func (rs *REST) Create(ctx api.Context, obj runtime.Object) (<-chan runtime.Obje if err != nil { return nil, err } + // External load-balancers require a known port for the service proxy. + // TODO: If we end up brokering HostPorts between Pods and Services, this can be any port. + srv.ProxyPort = srv.Port } err := rs.registry.CreateService(ctx, srv) if err != nil { @@ -110,6 +149,7 @@ func (rs *REST) Delete(ctx api.Context, id string) (<-chan runtime.Object, error if err != nil { return nil, err } + rs.portalMgr.Release(net.ParseIP(service.PortalIP)) return apiserver.MakeAsync(func() (runtime.Object, error) { rs.deleteExternalLoadBalancer(service) return &api.Status{Status: api.StatusSuccess}, rs.registry.DeleteService(ctx, id) @@ -161,16 +201,13 @@ func GetServiceEnvironmentVariables(ctx api.Context, registry Registry, machine for _, service := range services.Items { // Host name := makeEnvVariableName(service.ID) + "_SERVICE_HOST" - result = append(result, api.EnvVar{Name: name, Value: machine}) + result = append(result, api.EnvVar{Name: name, Value: service.PortalIP}) // Port name = makeEnvVariableName(service.ID) + "_SERVICE_PORT" result = append(result, api.EnvVar{Name: name, Value: strconv.Itoa(service.Port)}) // Docker-compatible vars. - result = append(result, makeLinkVariables(service, machine)...) + result = append(result, makeLinkVariables(service)...) } - // The 'SERVICE_HOST' variable is deprecated. - // TODO(thockin): get rid of it once ip-per-service is in and "deployed". - result = append(result, api.EnvVar{Name: "SERVICE_HOST", Value: machine}) return result, nil } @@ -183,8 +220,15 @@ func (rs *REST) Update(ctx api.Context, obj runtime.Object) (<-chan runtime.Obje return nil, errors.NewInvalid("service", srv.ID, errs) } return apiserver.MakeAsync(func() (runtime.Object, error) { + cur, err := rs.registry.GetService(ctx, srv.ID) + if err != nil { + return nil, err + } + // Copy over non-user fields. + srv.PortalIP = cur.PortalIP + srv.ProxyPort = cur.ProxyPort // TODO: check to see if external load balancer status changed - err := rs.registry.UpdateService(ctx, srv) + err = rs.registry.UpdateService(ctx, srv) if err != nil { return nil, err } @@ -234,7 +278,7 @@ func makeEnvVariableName(str string) string { return strings.ToUpper(strings.Replace(str, "-", "_", -1)) } -func makeLinkVariables(service api.Service, machine string) []api.EnvVar { +func makeLinkVariables(service api.Service) []api.EnvVar { prefix := makeEnvVariableName(service.ID) protocol := string(api.ProtocolTCP) if service.Protocol != "" { @@ -244,11 +288,11 @@ func makeLinkVariables(service api.Service, machine string) []api.EnvVar { return []api.EnvVar{ { Name: prefix + "_PORT", - Value: fmt.Sprintf("%s://%s:%d", strings.ToLower(protocol), machine, service.Port), + Value: fmt.Sprintf("%s://%s:%d", strings.ToLower(protocol), service.PortalIP, service.Port), }, { Name: portPrefix, - Value: fmt.Sprintf("%s://%s:%d", strings.ToLower(protocol), machine, service.Port), + Value: fmt.Sprintf("%s://%s:%d", strings.ToLower(protocol), service.PortalIP, service.Port), }, { Name: portPrefix + "_PROTO", @@ -260,7 +304,7 @@ func makeLinkVariables(service api.Service, machine string) []api.EnvVar { }, { Name: portPrefix + "_ADDR", - Value: machine, + Value: service.PortalIP, }, } } diff --git a/pkg/registry/service/rest_test.go b/pkg/registry/service/rest_test.go index 1e82e197b8..5907146b01 100644 --- a/pkg/registry/service/rest_test.go +++ b/pkg/registry/service/rest_test.go @@ -18,6 +18,7 @@ package service import ( "fmt" + "net" "reflect" "testing" @@ -29,11 +30,19 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest" ) +func makeIPNet(t *testing.T) *net.IPNet { + _, net, err := net.ParseCIDR("1.2.3.0/24") + if err != nil { + t.Error(err) + } + return net +} + func TestServiceRegistryCreate(t *testing.T) { registry := registrytest.NewServiceRegistry() fakeCloud := &cloud.FakeCloud{} machines := []string{"foo", "bar", "baz"} - storage := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{})) + storage := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t)) svc := &api.Service{ Port: 6502, TypeMeta: api.TypeMeta{ID: "foo"}, @@ -49,6 +58,12 @@ func TestServiceRegistryCreate(t *testing.T) { if created_service.CreationTimestamp.IsZero() { t.Errorf("Expected timestamp to be set, got: %v", created_service.CreationTimestamp) } + if created_service.PortalIP != "1.2.3.1" { + t.Errorf("Unexpected PortalIP: %s", created_service.PortalIP) + } + if created_service.ProxyPort != 0 { + t.Errorf("Unexpected ProxyPort: %d", created_service.ProxyPort) + } if len(fakeCloud.Calls) != 0 { t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls) } @@ -63,7 +78,7 @@ func TestServiceRegistryCreate(t *testing.T) { func TestServiceStorageValidatesCreate(t *testing.T) { registry := registrytest.NewServiceRegistry() - storage := NewREST(registry, nil, nil) + storage := NewREST(registry, nil, nil, makeIPNet(t)) failureCases := map[string]api.Service{ "empty ID": { Port: 6502, @@ -96,7 +111,7 @@ func TestServiceRegistryUpdate(t *testing.T) { TypeMeta: api.TypeMeta{ID: "foo"}, Selector: map[string]string{"bar": "baz1"}, }) - storage := NewREST(registry, nil, nil) + storage := NewREST(registry, nil, nil, makeIPNet(t)) c, err := storage.Update(ctx, &api.Service{ Port: 6502, TypeMeta: api.TypeMeta{ID: "foo"}, @@ -126,7 +141,7 @@ func TestServiceStorageValidatesUpdate(t *testing.T) { TypeMeta: api.TypeMeta{ID: "foo"}, Selector: map[string]string{"bar": "baz"}, }) - storage := NewREST(registry, nil, nil) + storage := NewREST(registry, nil, nil, makeIPNet(t)) failureCases := map[string]api.Service{ "empty ID": { Port: 6502, @@ -155,7 +170,7 @@ func TestServiceRegistryExternalService(t *testing.T) { registry := registrytest.NewServiceRegistry() fakeCloud := &cloud.FakeCloud{} machines := []string{"foo", "bar", "baz"} - storage := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{})) + storage := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t)) svc := &api.Service{ Port: 6502, TypeMeta: api.TypeMeta{ID: "foo"}, @@ -182,7 +197,7 @@ func TestServiceRegistryExternalServiceError(t *testing.T) { Err: fmt.Errorf("test error"), } machines := []string{"foo", "bar", "baz"} - storage := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{})) + storage := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t)) svc := &api.Service{ Port: 6502, TypeMeta: api.TypeMeta{ID: "foo"}, @@ -205,7 +220,7 @@ func TestServiceRegistryDelete(t *testing.T) { registry := registrytest.NewServiceRegistry() fakeCloud := &cloud.FakeCloud{} machines := []string{"foo", "bar", "baz"} - storage := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{})) + storage := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t)) svc := &api.Service{ TypeMeta: api.TypeMeta{ID: "foo"}, Selector: map[string]string{"bar": "baz"}, @@ -226,7 +241,7 @@ func TestServiceRegistryDeleteExternal(t *testing.T) { registry := registrytest.NewServiceRegistry() fakeCloud := &cloud.FakeCloud{} machines := []string{"foo", "bar", "baz"} - storage := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{})) + storage := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t)) svc := &api.Service{ TypeMeta: api.TypeMeta{ID: "foo"}, Selector: map[string]string{"bar": "baz"}, @@ -253,18 +268,21 @@ func TestServiceRegistryMakeLinkVariables(t *testing.T) { Selector: map[string]string{"bar": "baz"}, Port: 8080, Protocol: "TCP", + PortalIP: "1.2.3.4", }, { TypeMeta: api.TypeMeta{ID: "abc-123"}, Selector: map[string]string{"bar": "baz"}, Port: 8081, Protocol: "UDP", + PortalIP: "5.6.7.8", }, { TypeMeta: api.TypeMeta{ID: "q-u-u-x"}, Selector: map[string]string{"bar": "baz"}, Port: 8082, Protocol: "", + PortalIP: "9.8.7.6", }, }, } @@ -274,28 +292,27 @@ func TestServiceRegistryMakeLinkVariables(t *testing.T) { t.Errorf("Unexpected err: %v", err) } expected := []api.EnvVar{ - {Name: "FOO_BAR_SERVICE_HOST", Value: "machine"}, + {Name: "FOO_BAR_SERVICE_HOST", Value: "1.2.3.4"}, {Name: "FOO_BAR_SERVICE_PORT", Value: "8080"}, - {Name: "FOO_BAR_PORT", Value: "tcp://machine:8080"}, - {Name: "FOO_BAR_PORT_8080_TCP", Value: "tcp://machine:8080"}, + {Name: "FOO_BAR_PORT", Value: "tcp://1.2.3.4:8080"}, + {Name: "FOO_BAR_PORT_8080_TCP", Value: "tcp://1.2.3.4:8080"}, {Name: "FOO_BAR_PORT_8080_TCP_PROTO", Value: "tcp"}, {Name: "FOO_BAR_PORT_8080_TCP_PORT", Value: "8080"}, - {Name: "FOO_BAR_PORT_8080_TCP_ADDR", Value: "machine"}, - {Name: "ABC_123_SERVICE_HOST", Value: "machine"}, + {Name: "FOO_BAR_PORT_8080_TCP_ADDR", Value: "1.2.3.4"}, + {Name: "ABC_123_SERVICE_HOST", Value: "5.6.7.8"}, {Name: "ABC_123_SERVICE_PORT", Value: "8081"}, - {Name: "ABC_123_PORT", Value: "udp://machine:8081"}, - {Name: "ABC_123_PORT_8081_UDP", Value: "udp://machine:8081"}, + {Name: "ABC_123_PORT", Value: "udp://5.6.7.8:8081"}, + {Name: "ABC_123_PORT_8081_UDP", Value: "udp://5.6.7.8:8081"}, {Name: "ABC_123_PORT_8081_UDP_PROTO", Value: "udp"}, {Name: "ABC_123_PORT_8081_UDP_PORT", Value: "8081"}, - {Name: "ABC_123_PORT_8081_UDP_ADDR", Value: "machine"}, - {Name: "Q_U_U_X_SERVICE_HOST", Value: "machine"}, + {Name: "ABC_123_PORT_8081_UDP_ADDR", Value: "5.6.7.8"}, + {Name: "Q_U_U_X_SERVICE_HOST", Value: "9.8.7.6"}, {Name: "Q_U_U_X_SERVICE_PORT", Value: "8082"}, - {Name: "Q_U_U_X_PORT", Value: "tcp://machine:8082"}, - {Name: "Q_U_U_X_PORT_8082_TCP", Value: "tcp://machine:8082"}, + {Name: "Q_U_U_X_PORT", Value: "tcp://9.8.7.6:8082"}, + {Name: "Q_U_U_X_PORT_8082_TCP", Value: "tcp://9.8.7.6:8082"}, {Name: "Q_U_U_X_PORT_8082_TCP_PROTO", Value: "tcp"}, {Name: "Q_U_U_X_PORT_8082_TCP_PORT", Value: "8082"}, - {Name: "Q_U_U_X_PORT_8082_TCP_ADDR", Value: "machine"}, - {Name: "SERVICE_HOST", Value: "machine"}, + {Name: "Q_U_U_X_PORT_8082_TCP_ADDR", Value: "9.8.7.6"}, } if len(vars) != len(expected) { t.Errorf("Expected %d env vars, got: %+v", len(expected), vars) @@ -313,7 +330,7 @@ func TestServiceRegistryGet(t *testing.T) { registry := registrytest.NewServiceRegistry() fakeCloud := &cloud.FakeCloud{} machines := []string{"foo", "bar", "baz"} - storage := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{})) + storage := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t)) registry.CreateService(ctx, &api.Service{ TypeMeta: api.TypeMeta{ID: "foo"}, Selector: map[string]string{"bar": "baz"}, @@ -333,7 +350,7 @@ func TestServiceRegistryResourceLocation(t *testing.T) { registry.Endpoints = api.Endpoints{Endpoints: []string{"foo:80"}} fakeCloud := &cloud.FakeCloud{} machines := []string{"foo", "bar", "baz"} - storage := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{})) + storage := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t)) registry.CreateService(ctx, &api.Service{ TypeMeta: api.TypeMeta{ID: "foo"}, Selector: map[string]string{"bar": "baz"}, @@ -362,7 +379,7 @@ func TestServiceRegistryList(t *testing.T) { registry := registrytest.NewServiceRegistry() fakeCloud := &cloud.FakeCloud{} machines := []string{"foo", "bar", "baz"} - storage := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{})) + storage := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t)) registry.CreateService(ctx, &api.Service{ TypeMeta: api.TypeMeta{ID: "foo"}, Selector: map[string]string{"bar": "baz"}, @@ -390,3 +407,194 @@ func TestServiceRegistryList(t *testing.T) { t.Errorf("Unexpected resource version: %#v", sl) } } + +func TestServiceRegistryIPAllocation(t *testing.T) { + registry := registrytest.NewServiceRegistry() + fakeCloud := &cloud.FakeCloud{} + machines := []string{"foo", "bar", "baz"} + rest := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t)) + + svc1 := &api.Service{ + Port: 6502, + TypeMeta: api.TypeMeta{ID: "foo"}, + Selector: map[string]string{"bar": "baz"}, + } + ctx := api.NewDefaultContext() + c1, _ := rest.Create(ctx, svc1) + created_svc1 := <-c1 + created_service_1 := created_svc1.(*api.Service) + if created_service_1.ID != "foo" { + t.Errorf("Expected foo, but got %v", created_service_1.ID) + } + if created_service_1.PortalIP != "1.2.3.1" { + t.Errorf("Unexpected PortalIP: %s", created_service_1.PortalIP) + } + + svc2 := &api.Service{ + Port: 6502, + TypeMeta: api.TypeMeta{ID: "bar"}, + Selector: map[string]string{"bar": "baz"}, + } + ctx = api.NewDefaultContext() + c2, _ := rest.Create(ctx, svc2) + created_svc2 := <-c2 + created_service_2 := created_svc2.(*api.Service) + if created_service_2.ID != "bar" { + t.Errorf("Expected bar, but got %v", created_service_2.ID) + } + if created_service_2.PortalIP != "1.2.3.2" { // new IP + t.Errorf("Unexpected PortalIP: %s", created_service_2.PortalIP) + } +} + +func TestServiceRegistryIPReallocation(t *testing.T) { + registry := registrytest.NewServiceRegistry() + fakeCloud := &cloud.FakeCloud{} + machines := []string{"foo", "bar", "baz"} + rest := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t)) + + svc1 := &api.Service{ + Port: 6502, + TypeMeta: api.TypeMeta{ID: "foo"}, + Selector: map[string]string{"bar": "baz"}, + } + ctx := api.NewDefaultContext() + c1, _ := rest.Create(ctx, svc1) + created_svc1 := <-c1 + created_service_1 := created_svc1.(*api.Service) + if created_service_1.ID != "foo" { + t.Errorf("Expected foo, but got %v", created_service_1.ID) + } + if created_service_1.PortalIP != "1.2.3.1" { + t.Errorf("Unexpected PortalIP: %s", created_service_1.PortalIP) + } + + c, _ := rest.Delete(ctx, created_service_1.ID) + <-c + + svc2 := &api.Service{ + Port: 6502, + TypeMeta: api.TypeMeta{ID: "bar"}, + Selector: map[string]string{"bar": "baz"}, + } + ctx = api.NewDefaultContext() + c2, _ := rest.Create(ctx, svc2) + created_svc2 := <-c2 + created_service_2 := created_svc2.(*api.Service) + if created_service_2.ID != "bar" { + t.Errorf("Expected bar, but got %v", created_service_2.ID) + } + if created_service_2.PortalIP != "1.2.3.1" { // same IP as before + t.Errorf("Unexpected PortalIP: %s", created_service_2.PortalIP) + } +} + +func TestServiceRegistryIPUpdate(t *testing.T) { + registry := registrytest.NewServiceRegistry() + fakeCloud := &cloud.FakeCloud{} + machines := []string{"foo", "bar", "baz"} + rest := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t)) + + svc := &api.Service{ + Port: 6502, + TypeMeta: api.TypeMeta{ID: "foo"}, + Selector: map[string]string{"bar": "baz"}, + } + ctx := api.NewDefaultContext() + c, _ := rest.Create(ctx, svc) + created_svc := <-c + created_service := created_svc.(*api.Service) + if created_service.Port != 6502 { + t.Errorf("Expected port 6502, but got %v", created_service.Port) + } + if created_service.PortalIP != "1.2.3.1" { + t.Errorf("Unexpected PortalIP: %s", created_service.PortalIP) + } + if created_service.ProxyPort != 0 { + t.Errorf("Unexpected ProxyPort: %d", created_service.ProxyPort) + } + + update := new(api.Service) + *update = *created_service + update.Port = 6503 + update.PortalIP = "8.6.7.5" + update.ProxyPort = 309 + + c, _ = rest.Update(ctx, update) + updated_svc := <-c + updated_service := updated_svc.(*api.Service) + if updated_service.Port != 6503 { + t.Errorf("Expected port 6503, but got %v", updated_service.Port) + } + if updated_service.PortalIP != "1.2.3.1" { // unchanged, despite trying + t.Errorf("Unexpected PortalIP: %s", updated_service.PortalIP) + } + if updated_service.ProxyPort != 0 { // unchanged, despite trying + t.Errorf("Unexpected ProxyPort: %d", updated_service.ProxyPort) + } +} + +func TestServiceRegistryIPExternalLoadBalancer(t *testing.T) { + registry := registrytest.NewServiceRegistry() + fakeCloud := &cloud.FakeCloud{} + machines := []string{"foo", "bar", "baz"} + rest := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t)) + + svc := &api.Service{ + Port: 6502, + TypeMeta: api.TypeMeta{ID: "foo"}, + Selector: map[string]string{"bar": "baz"}, + CreateExternalLoadBalancer: true, + } + ctx := api.NewDefaultContext() + c, _ := rest.Create(ctx, svc) + created_svc := <-c + created_service := created_svc.(*api.Service) + if created_service.Port != 6502 { + t.Errorf("Expected port 6502, but got %v", created_service.Port) + } + if created_service.PortalIP != "1.2.3.1" { + t.Errorf("Unexpected PortalIP: %s", created_service.PortalIP) + } + if created_service.ProxyPort != 6502 { + t.Errorf("Unexpected ProxyPort: %d", created_service.ProxyPort) + } +} + +func TestServiceRegistryIPReloadFromStorage(t *testing.T) { + registry := registrytest.NewServiceRegistry() + fakeCloud := &cloud.FakeCloud{} + machines := []string{"foo", "bar", "baz"} + rest1 := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t)) + + svc := &api.Service{ + Port: 6502, + TypeMeta: api.TypeMeta{ID: "foo"}, + Selector: map[string]string{"bar": "baz"}, + } + ctx := api.NewDefaultContext() + c, _ := rest1.Create(ctx, svc) + <-c + svc = &api.Service{ + Port: 6502, + TypeMeta: api.TypeMeta{ID: "foo"}, + Selector: map[string]string{"bar": "baz"}, + } + c, _ = rest1.Create(ctx, svc) + <-c + + // This will reload from storage, finding the previous 2 + rest2 := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t)) + + svc = &api.Service{ + Port: 6502, + TypeMeta: api.TypeMeta{ID: "foo"}, + Selector: map[string]string{"bar": "baz"}, + } + c, _ = rest2.Create(ctx, svc) + created_svc := <-c + created_service := created_svc.(*api.Service) + if created_service.PortalIP != "1.2.3.3" { + t.Errorf("Unexpected PortalIP: %s", created_service.PortalIP) + } +}