Core support for ip-per-service

pull/6/head
Tim Hockin 2014-09-18 16:03:34 -07:00
parent 63e1906902
commit e907011111
26 changed files with 735 additions and 227 deletions

View File

@ -31,3 +31,4 @@ MINION_IP_RANGES=($(eval echo "10.244.{1..${NUM_MINIONS}}.0/24"))
MINION_SCOPES="compute-rw"
# Increase the sleep interval value if concerned about API rate limits. 3, in seconds, is the default.
POLL_SLEEP_INTERVAL=3
PORTAL_NET="10.0.0.0/16"

View File

@ -31,3 +31,4 @@ MINION_IP_RANGES=($(eval echo "10.245.{1..${NUM_MINIONS}}.0/24"))
MINION_SCOPES=""
# Increase the sleep interval value if concerned about API rate limits. 3, in seconds, is the default.
POLL_SLEEP_INTERVAL=3
PORTAL_NET="10.0.0.0/16"

View File

@ -21,6 +21,7 @@
mkdir -p /srv/salt-overlay/pillar
cat <<EOF >/srv/salt-overlay/pillar/cluster-params.sls
node_instance_prefix: $NODE_INSTANCE_PREFIX
portal_net: $PORTAL_NET
EOF
mkdir -p /srv/salt-overlay/salt/nginx

View File

@ -263,6 +263,7 @@ function kube-up {
echo "readonly SERVER_BINARY_TAR_URL='${SERVER_BINARY_TAR_URL}'"
echo "readonly SALT_TAR_URL='${SALT_TAR_URL}'"
echo "readonly MASTER_HTPASSWD='${htpasswd}'"
echo "readonly PORTAL_NET='${PORTAL_NET}'"
grep -v "^#" "${KUBE_ROOT}/cluster/gce/templates/create-dynamic-salt-files.sh"
grep -v "^#" "${KUBE_ROOT}/cluster/gce/templates/download-release.sh"
grep -v "^#" "${KUBE_ROOT}/cluster/gce/templates/salt-master.sh"

View File

@ -55,5 +55,8 @@
{%- set minion_regexp = "" %}
{% endif %}
{% endif %}
{% if pillar['portal_net'] is defined %}
{% set portal_net = "-portal_net=" + pillar['portal_net'] %}
{% endif %}
DAEMON_ARGS="{{daemon_args}} {{address}} {{machines}} {{etcd_servers}} {{ minion_regexp }} {{ cloud_provider }} --allow_privileged={{pillar['allow_privileged']}}"
DAEMON_ARGS="{{daemon_args}} {{address}} {{machines}} {{etcd_servers}} {{ minion_regexp }} {{ cloud_provider }} --allow_privileged={{pillar['allow_privileged']}} {{portal_net}}"

View File

@ -22,7 +22,7 @@ DAEMON_ARGS=""
DAEMON_LOG_FILE=/var/log/$NAME.log
PIDFILE=/var/run/$NAME.pid
SCRIPTNAME=/etc/init.d/$NAME
DAEMON_USER=kube-proxy
DAEMON_USER=root
# Exit if the package is not installed
[ -x "$DAEMON" ] || exit 0

View File

@ -64,6 +64,7 @@ var (
machineList util.StringList
corsAllowedOriginList util.StringList
allowPrivileged = flag.Bool("allow_privileged", false, "If true, allow privileged containers.")
portalNet util.IPNet // TODO: make this a list
// TODO: Discover these by pinging the host machines, and rip out these flags.
nodeMilliCPU = flag.Int("node_milli_cpu", 1000, "The amount of MilliCPU provisioned on each node")
nodeMemory = flag.Int("node_memory", 3*1024*1024*1024, "The amount of memory (in bytes) provisioned on each node")
@ -75,6 +76,7 @@ func init() {
flag.Var(&etcdServerList, "etcd_servers", "List of etcd servers to watch (http://ip:port), comma separated. Mutually exclusive with -etcd_config")
flag.Var(&machineList, "machines", "List of machines to schedule onto, comma separated.")
flag.Var(&corsAllowedOriginList, "cors_allowed_origins", "List of allowed origins for CORS, comma separated. An allowed origin can be a regular expression to support subdomain matching. If this list is empty CORS will not be enabled.")
flag.Var(&portalNet, "portal_net", "A CIDR notation IP range from which to assign portal IPs. This must not overlap with any IP ranges assigned to nodes for pods.")
}
func verifyMinionFlags() {
@ -89,6 +91,13 @@ func verifyMinionFlags() {
}
}
// TODO: Longer term we should read this from some config store, rather than a flag.
func verifyPortalFlags() {
if portalNet.IP == nil {
glog.Fatal("No -portal_net specified")
}
}
func initCloudProvider(name string, configFilePath string) cloudprovider.Interface {
var config *os.File
@ -141,6 +150,7 @@ func main() {
verflag.PrintAndExitIfRequested()
verifyMinionFlags()
verifyPortalFlags()
if (*etcdConfigFile != "" && len(etcdServerList) != 0) || (*etcdConfigFile == "" && len(etcdServerList) == 0) {
glog.Fatalf("specify either -etcd_servers or -etcd_config")
@ -172,6 +182,7 @@ func main() {
glog.Fatalf("Invalid storage version or misconfigured etcd: %v", err)
}
n := net.IPNet(portalNet)
m := master.New(&master.Config{
Client: client,
Cloud: cloud,
@ -188,6 +199,7 @@ func main() {
resources.Memory: util.NewIntOrStringFromInt(*nodeMemory),
},
},
PortalNet: &n,
})
mux := http.NewServeMux()

View File

@ -123,11 +123,16 @@ func startComponents(manifestURL string) (apiServerURL string) {
}
// Master
_, portalNet, err := net.ParseCIDR("10.0.0.0/24")
if err != nil {
glog.Fatalf("Unable to parse CIDR: %v", err)
}
m := master.New(&master.Config{
Client: cl,
EtcdHelper: helper,
Minions: machineList,
PodInfoGetter: fakePodInfoGetter{},
PortalNet: portalNet,
})
mux := http.NewServeMux()
apiserver.NewAPIGroup(m.API_v1beta1()).InstallREST(mux, "/api/v1beta1")
@ -349,18 +354,33 @@ func runServiceTest(client *client.Client) {
if err := wait.Poll(time.Second, time.Second*20, podExists(client, ctx, pod.ID)); err != nil {
glog.Fatalf("FAILED: pod never started running %v", err)
}
svc := api.Service{
svc1 := api.Service{
TypeMeta: api.TypeMeta{ID: "service1"},
Selector: map[string]string{
"name": "thisisalonglabel",
},
Port: 8080,
}
_, err = client.CreateService(ctx, &svc)
_, err = client.CreateService(ctx, &svc1)
if err != nil {
glog.Fatalf("Failed to create service: %v, %v", svc, err)
glog.Fatalf("Failed to create service: %v, %v", svc1, err)
}
if err := wait.Poll(time.Second, time.Second*10, endpointsSet(client, ctx, svc.ID, 1)); err != nil {
if err := wait.Poll(time.Second, time.Second*20, endpointsSet(client, ctx, svc1.ID, 1)); err != nil {
glog.Fatalf("FAILED: unexpected endpoints: %v", err)
}
// A second service with the same port.
svc2 := api.Service{
TypeMeta: api.TypeMeta{ID: "service2"},
Selector: map[string]string{
"name": "thisisalonglabel",
},
Port: 8080,
}
_, err = client.CreateService(ctx, &svc2)
if err != nil {
glog.Fatalf("Failed to create service: %v, %v", svc2, err)
}
if err := wait.Poll(time.Second, time.Second*20, endpointsSet(client, ctx, svc2.ID, 1)); err != nil {
glog.Fatalf("FAILED: unexpected endpoints: %v", err)
}
glog.Info("Service test passed.")

View File

@ -25,6 +25,8 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/proxy"
"github.com/GoogleCloudPlatform/kubernetes/pkg/proxy/config"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/exec"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/iptables"
"github.com/GoogleCloudPlatform/kubernetes/pkg/version/verflag"
"github.com/coreos/go-etcd/etcd"
"github.com/golang/glog"
@ -40,7 +42,7 @@ var (
func init() {
client.BindClientConfigFlags(flag.CommandLine, clientConfig)
flag.Var(&etcdServerList, "etcd_servers", "List of etcd servers to watch (http://ip:port), comma separated (optional). Mutually exclusive with -etcd_config")
flag.Var(&bindAddress, "bind_address", "The address for the proxy server to serve on (set to 0.0.0.0 for all interfaces)")
flag.Var(&bindAddress, "bind_address", "The IP address for the proxy server to serve on (set to 0.0.0.0 for all interfaces)")
}
func main() {
@ -97,12 +99,12 @@ func main() {
}
loadBalancer := proxy.NewLoadBalancerRR()
proxier := proxy.NewProxier(loadBalancer, net.IP(bindAddress))
proxier := proxy.NewProxier(loadBalancer, net.IP(bindAddress), iptables.New(exec.New()))
// Wire proxier to handle changes to services
serviceConfig.RegisterHandler(proxier)
// And wire loadBalancer to handle changes to endpoints to services
endpointsConfig.RegisterHandler(loadBalancer)
// Just loop forever for now...
select {}
proxier.SyncLoop()
}

View File

@ -85,7 +85,7 @@ redis-slave-controller gurpartap/redis name=redis,role=slave 2
The redis slave configures itself by looking for the Kubernetes service environment variables in the container environment. In particular, the redis slave is started with the following command:
```shell
redis-server --slaveof $SERVICE_HOST $REDIS_MASTER_SERVICE_PORT
redis-server --slaveof $REDIS_MASTER_SERVICE_HOST $REDIS_MASTER_SERVICE_PORT
```
Once that's up you can list the pods in the cluster, to verify that the master and slaves are running:

View File

@ -71,7 +71,7 @@ func HandleError(result interface{}, err error) (r interface{}) {
}
func main() {
pool = simpleredis.NewConnectionPoolHost(os.Getenv("SERVICE_HOST") + ":" + os.Getenv("REDIS_MASTER_SERVICE_PORT"))
pool = simpleredis.NewConnectionPoolHost(os.Getenv("REDIS_MASTER_SERVICE_HOST") + ":" + os.Getenv("REDIS_MASTER_SERVICE_PORT"))
defer pool.Close()
r := mux.NewRouter()

View File

@ -71,7 +71,8 @@ ${GO_OUT}/apiserver \
--port="${API_PORT}" \
--etcd_servers="http://${ETCD_HOST}:${ETCD_PORT}" \
--machines="127.0.0.1" \
--minion_port=${KUBELET_PORT} 1>&2 &
--minion_port=${KUBELET_PORT} \
--portal_net="10.0.0.0/24" 1>&2 &
APISERVER_PID=$!
wait_for_url "http://127.0.0.1:${API_PORT}/healthz" "apiserver: "

View File

@ -423,6 +423,13 @@ type Service struct {
// ContainerPort is the name of the port on the container to direct traffic to.
// Optional, if unspecified use the first port on the container.
ContainerPort util.IntOrString `json:"containerPort,omitempty" yaml:"containerPort,omitempty"`
// PortalIP is assigned by the master. If specified by the user it will be ignored.
// TODO: This is awkward - if we had a BoundService, it would be better factored.
PortalIP string `json:"portalIP,omitempty" yaml:"portalIP,omitempty"`
// ProxyPort is assigned by the master. If specified by the user it will be ignored.
ProxyPort int `json:"proxyPort,omitempty" yaml:"proxyPort,omitempty"`
}
// Endpoints is a collection of endpoints that implement the actual service, for example:

View File

@ -451,6 +451,12 @@ type Service struct {
// ContainerPort is the name of the port on the container to direct traffic to.
// Optional, if unspecified use the first port on the container.
ContainerPort util.IntOrString `json:"containerPort,omitempty" yaml:"containerPort,omitempty"`
// PortalIP is assigned by the master. If specified by the user it will be ignored.
PortalIP string `json:"portalIP,omitempty" yaml:"portalIP,omitempty"`
// ProxyPort is assigned by the master. If specified by the user it will be ignored.
ProxyPort int `json:"proxyPort,omitempty" yaml:"proxyPort,omitempty"`
}
// Endpoints is a collection of endpoints that implement the actual service, for example:

View File

@ -416,6 +416,12 @@ type Service struct {
// ContainerPort is the name of the port on the container to direct traffic to.
// Optional, if unspecified use the first port on the container.
ContainerPort util.IntOrString `json:"containerPort,omitempty" yaml:"containerPort,omitempty"`
// PortalIP is assigned by the master. If specified by the user it will be ignored.
PortalIP string `json:"portalIP,omitempty" yaml:"portalIP,omitempty"`
// ProxyPort is assigned by the master. If specified by the user it will be ignored.
ProxyPort int `json:"proxyPort,omitempty" yaml:"proxyPort,omitempty"`
}
// Endpoints is a collection of endpoints that implement the actual service, for example:

View File

@ -570,6 +570,11 @@ type ReplicationControllerList struct {
// ServiceStatus represents the current status of a service
type ServiceStatus struct {
// PortalIP is assigned by the master.
PortalIP string `json:"portalIP,omitempty" yaml:"portalIP,omitempty"`
// ProxyPort is assigned by the master. If 0, the proxy will choose an ephemeral port.
ProxyPort int `json:"proxyPort,omitempty" yaml:"proxyPort,omitempty"`
}
// ServiceSpec describes the attributes that a user creates on a service

View File

@ -140,7 +140,7 @@ func (h *HumanReadablePrinter) validatePrintHandlerFunc(printFunc reflect.Value)
var podColumns = []string{"ID", "Image(s)", "Host", "Labels", "Status"}
var replicationControllerColumns = []string{"ID", "Image(s)", "Selector", "Replicas"}
var serviceColumns = []string{"ID", "Labels", "Selector", "Port"}
var serviceColumns = []string{"ID", "Labels", "Selector", "IP", "Port"}
var minionColumns = []string{"Minion identifier"}
var statusColumns = []string{"Status"}
var eventColumns = []string{"Name", "Kind", "Status", "Reason", "Message"}
@ -226,8 +226,8 @@ func printReplicationControllerList(list *api.ReplicationControllerList, w io.Wr
}
func printService(svc *api.Service, w io.Writer) error {
_, err := fmt.Fprintf(w, "%s\t%s\t%s\t%d\n", svc.ID, labels.Set(svc.Labels),
labels.Set(svc.Selector), svc.Port)
_, err := fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%d\n", svc.ID, labels.Set(svc.Labels),
labels.Set(svc.Selector), svc.PortalIP, svc.Port)
return err
}

View File

@ -152,7 +152,7 @@ func (h *HumanReadablePrinter) validatePrintHandlerFunc(printFunc reflect.Value)
var podColumns = []string{"ID", "IMAGE(S)", "HOST", "LABELS", "STATUS"}
var replicationControllerColumns = []string{"ID", "IMAGE(S)", "SELECTOR", "REPLICAS"}
var serviceColumns = []string{"ID", "LABELS", "SELECTOR", "PORT"}
var serviceColumns = []string{"ID", "LABELS", "SELECTOR", "IP", "PORT"}
var minionColumns = []string{"ID"}
var statusColumns = []string{"STATUS"}
@ -222,8 +222,8 @@ func printReplicationControllerList(list *api.ReplicationControllerList, w io.Wr
}
func printService(svc *api.Service, w io.Writer) error {
_, err := fmt.Fprintf(w, "%s\t%s\t%s\t%d\n", svc.ID, labels.Set(svc.Labels),
labels.Set(svc.Selector), svc.Port)
_, err := fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%d\n", svc.ID, labels.Set(svc.Labels),
labels.Set(svc.Selector), svc.PortalIP, svc.Port)
return err
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package master
import (
"net"
"net/http"
"time"
@ -54,6 +55,7 @@ type Config struct {
MinionRegexp string
PodInfoGetter client.PodInfoGetter
NodeResources api.NodeResources
PortalNet *net.IPNet
}
// Master contains state for a Kubernetes cluster master/api server.
@ -67,6 +69,7 @@ type Master struct {
eventRegistry generic.Registry
storage map[string]apiserver.RESTStorage
client *client.Client
portalNet *net.IPNet
}
// NewEtcdHelper returns an EtcdHelper for the provided arguments or an error if the version
@ -98,6 +101,7 @@ func New(c *Config) *Master {
eventRegistry: event.NewEtcdRegistry(c.EtcdHelper, uint64(c.EventTTL.Seconds())),
minionRegistry: minionRegistry,
client: c.Client,
portalNet: c.PortalNet,
}
m.init(c)
return m
@ -137,7 +141,7 @@ func (m *Master) init(c *Config) {
Minions: m.client,
}),
"replicationControllers": controller.NewREST(m.controllerRegistry, m.podRegistry),
"services": service.NewREST(m.serviceRegistry, c.Cloud, m.minionRegistry),
"services": service.NewREST(m.serviceRegistry, c.Cloud, m.minionRegistry, m.portalNet),
"endpoints": endpoint.NewREST(m.endpointRegistry),
"minions": minion.NewREST(m.minionRegistry),
"events": event.NewREST(m.eventRegistry),

View File

@ -27,16 +27,19 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/iptables"
"github.com/golang/glog"
)
type serviceInfo struct {
port int
protocol api.Protocol
socket proxySocket
timeout time.Duration
mu sync.Mutex // protects active
active bool
portalIP net.IP
portalPort int
protocol api.Protocol
proxyPort int
socket proxySocket
timeout time.Duration
mu sync.Mutex // protects active
active bool
}
func (si *serviceInfo) isActive() bool {
@ -64,7 +67,7 @@ type proxySocket interface {
// on the impact of calling Close while sessions are active.
Close() error
// ProxyLoop proxies incoming connections for the specified service to the service endpoints.
ProxyLoop(service string, proxier *Proxier)
ProxyLoop(service string, info *serviceInfo, proxier *Proxier)
}
// tcpProxySocket implements proxySocket. Close() is implemented by net.Listener. When Close() is called,
@ -73,12 +76,7 @@ type tcpProxySocket struct {
net.Listener
}
func (tcp *tcpProxySocket) ProxyLoop(service string, proxier *Proxier) {
info, found := proxier.getServiceInfo(service)
if !found {
glog.Errorf("Failed to find service: %s", service)
return
}
func (tcp *tcpProxySocket) ProxyLoop(service string, info *serviceInfo, proxier *Proxier) {
for {
if !info.isActive() {
break
@ -97,7 +95,7 @@ func (tcp *tcpProxySocket) ProxyLoop(service string, proxier *Proxier) {
inConn.Close()
continue
}
glog.V(3).Infof("Mapped service %s to endpoint %s", service, endpoint)
glog.V(3).Infof("Mapped service %q to endpoint %s", service, endpoint)
// TODO: This could spin up a new goroutine to make the outbound connection,
// and keep accepting inbound traffic.
outConn, err := net.DialTimeout("tcp", endpoint, endpointDialTimeout)
@ -118,22 +116,23 @@ func proxyTCP(in, out *net.TCPConn) {
wg.Add(2)
glog.V(4).Infof("Creating proxy between %v <-> %v <-> %v <-> %v",
in.RemoteAddr(), in.LocalAddr(), out.LocalAddr(), out.RemoteAddr())
go copyBytes(in, out, &wg)
go copyBytes(out, in, &wg)
go copyBytes("from backend", in, out, &wg)
go copyBytes("to backend", out, in, &wg)
wg.Wait()
in.Close()
out.Close()
}
func copyBytes(in, out *net.TCPConn, wg *sync.WaitGroup) {
func copyBytes(direction string, dest, src *net.TCPConn, wg *sync.WaitGroup) {
defer wg.Done()
glog.V(4).Infof("Copying from %v <-> %v <-> %v <-> %v",
in.RemoteAddr(), in.LocalAddr(), out.LocalAddr(), out.RemoteAddr())
if _, err := io.Copy(in, out); err != nil {
glog.V(4).Infof("Copying %s: %s -> %s", direction, src.RemoteAddr(), dest.RemoteAddr())
n, err := io.Copy(dest, src)
if err != nil {
glog.Errorf("I/O error: %v", err)
}
in.CloseRead()
out.CloseWrite()
glog.V(4).Infof("Copied %d bytes %s: %s -> %s", n, direction, src.RemoteAddr(), dest.RemoteAddr())
dest.CloseWrite()
src.CloseRead()
}
// udpProxySocket implements proxySocket. Close() is implemented by net.UDPConn. When Close() is called,
@ -157,12 +156,7 @@ func newClientCache() *clientCache {
return &clientCache{clients: map[string]net.Conn{}}
}
func (udp *udpProxySocket) ProxyLoop(service string, proxier *Proxier) {
info, found := proxier.getServiceInfo(service)
if !found {
glog.Errorf("Failed to find service: %s", service)
return
}
func (udp *udpProxySocket) ProxyLoop(service string, info *serviceInfo, proxier *Proxier) {
activeClients := newClientCache()
var buffer [4096]byte // 4KiB should be enough for most whole-packets
for {
@ -220,7 +214,7 @@ func (udp *udpProxySocket) getBackendConn(activeClients *clientCache, cliAddr ne
glog.Errorf("Couldn't find an endpoint for %s %v", service, err)
return nil, err
}
glog.V(4).Infof("Mapped service %s to endpoint %s", service, endpoint)
glog.V(3).Infof("Mapped service %q to endpoint %s", service, endpoint)
svrConn, err = net.DialTimeout("udp", endpoint, endpointDialTimeout)
if err != nil {
// TODO: Try another endpoint?
@ -237,6 +231,7 @@ func (udp *udpProxySocket) getBackendConn(activeClients *clientCache, cliAddr ne
}
// This function is expected to be called as a goroutine.
// TODO: Track and log bytes copied, like TCP
func (udp *udpProxySocket) proxyClient(cliAddr net.Addr, svrConn net.Conn, activeClients *clientCache, timeout time.Duration) {
defer svrConn.Close()
var buffer [4096]byte
@ -302,19 +297,64 @@ func newProxySocket(protocol api.Protocol, ip net.IP, port int) (proxySocket, er
// Proxier is a simple proxy for TCP connections between a localhost:lport
// and services that provide the actual implementations.
type Proxier struct {
loadBalancer LoadBalancer
mu sync.Mutex // protects serviceMap
serviceMap map[string]*serviceInfo
address net.IP
loadBalancer LoadBalancer
mu sync.Mutex // protects serviceMap
serviceMap map[string]*serviceInfo
listenAddress net.IP
iptables iptables.Interface
}
// NewProxier returns a new Proxier given a LoadBalancer and an
// address on which to listen
func NewProxier(loadBalancer LoadBalancer, address net.IP) *Proxier {
// NewProxier returns a new Proxier given a LoadBalancer and an address on
// which to listen. Because of the iptables logic, It is assumed that there
// is only a single Proxier active on a machine.
func NewProxier(loadBalancer LoadBalancer, listenAddress net.IP, iptables iptables.Interface) *Proxier {
glog.Infof("Initializing iptables")
// Set up the iptables foundations we need.
if err := iptablesInit(iptables); err != nil {
glog.Errorf("Failed to initialize iptables: %s", err)
return nil
}
// Flush old iptables rules (since the bound ports will be invalid after a restart).
// When OnUpdate() is first called, the rules will be recreated.
if err := iptablesFlush(iptables); err != nil {
glog.Errorf("Failed to flush iptables: %s", err)
return nil
}
return &Proxier{
loadBalancer: loadBalancer,
serviceMap: make(map[string]*serviceInfo),
address: address,
loadBalancer: loadBalancer,
serviceMap: make(map[string]*serviceInfo),
listenAddress: listenAddress,
iptables: iptables,
}
}
// The periodic interval for checking the state of things.
const syncInterval = 5 * time.Second
// SyncLoop runs periodic work. This is expected to run as a goroutine or as the main loop of the app. It does not return.
func (proxier *Proxier) SyncLoop() {
for {
select {
case <-time.After(syncInterval):
glog.V(2).Infof("Periodic sync")
if err := iptablesInit(proxier.iptables); err != nil {
glog.Errorf("Failed to ensure iptables: %s", err)
}
proxier.ensurePortals()
}
}
}
// Ensure that portals exist for all services.
func (proxier *Proxier) ensurePortals() {
proxier.mu.Lock()
defer proxier.mu.Unlock()
// NB: This does not remove rules that should not be present.
for name, info := range proxier.serviceMap {
err := proxier.openPortal(name, info)
if err != nil {
glog.Errorf("Failed to ensure portal for %q: %s", name, err)
}
}
}
@ -330,7 +370,6 @@ func (proxier *Proxier) stopProxyInternal(service string, info *serviceInfo) err
if !info.setActive(false) {
return nil
}
glog.V(3).Infof("Removing service: %s", service)
delete(proxier.serviceMap, service)
return info.socket.Close()
}
@ -348,39 +387,40 @@ func (proxier *Proxier) setServiceInfo(service string, info *serviceInfo) {
proxier.serviceMap[service] = info
}
// addServiceOnUnusedPort starts listening for a new service, returning the
// port it's using. For testing on a system with unknown ports used. The timeout only applies to UDP
// addServiceOnPort starts listening for a new service, returning the serviceInfo.
// Pass proxyPort=0 to allocate a random port. The timeout only applies to UDP
// connections, for now.
func (proxier *Proxier) addServiceOnUnusedPort(service string, protocol api.Protocol, timeout time.Duration) (string, error) {
sock, err := newProxySocket(protocol, proxier.address, 0)
func (proxier *Proxier) addServiceOnPort(service string, protocol api.Protocol, proxyPort int, timeout time.Duration) (*serviceInfo, error) {
sock, err := newProxySocket(protocol, proxier.listenAddress, proxyPort)
if err != nil {
return "", err
return nil, err
}
_, port, err := net.SplitHostPort(sock.Addr().String())
_, portStr, err := net.SplitHostPort(sock.Addr().String())
if err != nil {
return "", err
sock.Close()
return nil, err
}
portNum, err := strconv.Atoi(port)
portNum, err := strconv.Atoi(portStr)
if err != nil {
return "", err
sock.Close()
return nil, err
}
proxier.setServiceInfo(service, &serviceInfo{
port: portNum,
protocol: protocol,
active: true,
socket: sock,
timeout: timeout,
})
proxier.startAccepting(service, sock)
return port, nil
}
si := &serviceInfo{
proxyPort: portNum,
protocol: protocol,
active: true,
socket: sock,
timeout: timeout,
}
proxier.setServiceInfo(service, si)
func (proxier *Proxier) startAccepting(service string, sock proxySocket) {
glog.V(1).Infof("Listening for %s on %s:%s", service, sock.Addr().Network(), sock.Addr().String())
go func(service string, proxier *Proxier) {
glog.V(1).Infof("Proxying for service %q on %s port %d", service, protocol, portNum)
go func(service string, info *serviceInfo, proxier *Proxier) {
defer util.HandleCrash()
sock.ProxyLoop(service, proxier)
}(service, proxier)
sock.ProxyLoop(service, info, proxier)
}(service, si, proxier)
return si, nil
}
// How long we leave idle UDP connections open.
@ -395,39 +435,133 @@ func (proxier *Proxier) OnUpdate(services []api.Service) {
for _, service := range services {
activeServices.Insert(service.ID)
info, exists := proxier.getServiceInfo(service.ID)
serviceIP := net.ParseIP(service.PortalIP)
// TODO: check health of the socket? What if ProxyLoop exited?
if exists && info.isActive() && info.port == service.Port {
if exists && info.isActive() && info.portalPort == service.Port && info.portalIP.Equal(serviceIP) {
continue
}
if exists && info.port != service.Port {
err := proxier.stopProxy(service.ID, info)
if exists && (info.portalPort != service.Port || !info.portalIP.Equal(serviceIP)) {
glog.V(4).Infof("Something changed for service %q: stopping it", service.ID)
err := proxier.closePortal(service.ID, info)
if err != nil {
glog.Errorf("error stopping %s: %v", service.ID, err)
glog.Errorf("Failed to close portal for %q: %s", service.ID, err)
}
err = proxier.stopProxy(service.ID, info)
if err != nil {
glog.Errorf("Failed to stop service %q: %s", service.ID, err)
}
}
glog.V(3).Infof("Adding a new service %s on %s port %d", service.ID, service.Protocol, service.Port)
sock, err := newProxySocket(service.Protocol, proxier.address, service.Port)
glog.V(1).Infof("Adding new service %q at %s:%d/%s (local :%d)", service.ID, serviceIP, service.Port, service.Protocol, service.ProxyPort)
info, err := proxier.addServiceOnPort(service.ID, service.Protocol, service.ProxyPort, udpIdleTimeout)
if err != nil {
glog.Errorf("Failed to get a socket for %s: %+v", service.ID, err)
glog.Errorf("Failed to start proxy for %q: %+v", service.ID, err)
continue
}
proxier.setServiceInfo(service.ID, &serviceInfo{
port: service.Port,
protocol: service.Protocol,
active: true,
socket: sock,
timeout: udpIdleTimeout,
})
proxier.startAccepting(service.ID, sock)
info.portalIP = serviceIP
info.portalPort = service.Port
err = proxier.openPortal(service.ID, info)
if err != nil {
glog.Errorf("Failed to open portal for %q: %s", service.ID, err)
}
}
proxier.mu.Lock()
defer proxier.mu.Unlock()
for name, info := range proxier.serviceMap {
if !activeServices.Has(name) {
err := proxier.stopProxyInternal(name, info)
glog.V(1).Infof("Stopping service %q", name)
err := proxier.closePortal(name, info)
if err != nil {
glog.Errorf("error stopping %s: %v", name, err)
glog.Errorf("Failed to close portal for %q: %s", name, err)
}
err = proxier.stopProxyInternal(name, info)
if err != nil {
glog.Errorf("Failed to stop service %q: %s", name, err)
}
}
}
}
func (proxier *Proxier) openPortal(service string, info *serviceInfo) error {
args := iptablesPortalArgs(info.portalIP, info.portalPort, proxier.listenAddress, info.proxyPort, service)
existed, err := proxier.iptables.EnsureRule(iptables.TableNAT, iptablesProxyChain, args...)
if err != nil {
glog.Errorf("Failed to install iptables %s rule for service %q", iptablesProxyChain, service)
return err
}
if !existed {
glog.Infof("Opened iptables portal for service %q on %s:%d", service, info.portalIP, info.portalPort)
}
return nil
}
func (proxier *Proxier) closePortal(service string, info *serviceInfo) error {
args := iptablesPortalArgs(info.portalIP, info.portalPort, proxier.listenAddress, info.proxyPort, service)
if err := proxier.iptables.DeleteRule(iptables.TableNAT, iptablesProxyChain, args...); err != nil {
glog.Errorf("Failed to delete iptables %s rule for service %q", iptablesProxyChain, service)
return err
}
glog.Infof("Closed iptables portal for service %q", service)
return nil
}
var iptablesProxyChain iptables.Chain = "KUBE-PROXY"
// Ensure that the iptables infrastructure we use is set up. This can safely be called periodically.
func iptablesInit(ipt iptables.Interface) error {
// TODO: There is almost certainly room for optimization here. E.g. If
// we knew the portal_net CIDR we could fast-track outbound packets not
// destined for a service. There's probably more, help wanted.
if _, err := ipt.EnsureChain(iptables.TableNAT, iptablesProxyChain); err != nil {
return err
}
if _, err := ipt.EnsureRule(iptables.TableNAT, iptables.ChainPrerouting, "-j", string(iptablesProxyChain)); err != nil {
return err
}
if _, err := ipt.EnsureRule(iptables.TableNAT, iptables.ChainOutput, "-j", string(iptablesProxyChain)); err != nil {
return err
}
return nil
}
// Flush all of our custom iptables rules.
func iptablesFlush(ipt iptables.Interface) error {
return ipt.FlushChain(iptables.TableNAT, iptablesProxyChain)
}
// Used below.
var zeroIP = net.ParseIP("0.0.0.0")
var localhostIP = net.ParseIP("127.0.0.1")
// Build a slice of iptables args for a portal rule.
func iptablesPortalArgs(destIP net.IP, destPort int, proxyIP net.IP, proxyPort int, service string) []string {
args := []string{
"-m", "comment",
"--comment", service,
"-p", "tcp",
"-d", destIP.String(),
"--dport", fmt.Sprintf("%d", destPort),
}
// This is tricky. If the proxy is bound (see Proxier.listenAddress)
// to 0.0.0.0 ("any interface") or 127.0.0.1, we can use REDIRECT,
// which will bring packets back to the host's loopback interface. If
// the proxy is bound to any other interface, then it is not listening
// on the hosts's loopback, so we have to use DNAT to that specific
// IP. We can not simply use DNAT to 127.0.0.1 in the first case
// because from within a container, 127.0.0.1 is the container's
// loopback interface, not the host's.
//
// Why would anyone bind to an address that is not inclusive of
// localhost? Apparently some cloud environments have their public IP
// exposed as a real network interface AND do not have firewalling. We
// don't want to expose everything out to the world.
//
// Unfortunately, I don't know of any way to listen on some (N > 1)
// interfaces but not ALL interfaces, short of doing it manually, and
// this is simpler than that.
if proxyIP.Equal(zeroIP) || proxyIP.Equal(localhostIP) {
args = append(args, "-j", "REDIRECT", "--to-ports", fmt.Sprintf("%d", proxyPort))
} else {
args = append(args, "-j", "DNAT", "--to-destination", fmt.Sprintf("%s:%d", proxyIP.String(), proxyPort))
}
return args
}

View File

@ -28,23 +28,28 @@ import (
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/iptables"
)
func waitForClosedPortTCP(p *Proxier, proxyPort string) error {
func joinHostPort(host string, port int) string {
return net.JoinHostPort(host, fmt.Sprintf("%d", port))
}
func waitForClosedPortTCP(p *Proxier, proxyPort int) error {
for i := 0; i < 50; i++ {
conn, err := net.Dial("tcp", net.JoinHostPort("127.0.0.1", proxyPort))
conn, err := net.Dial("tcp", joinHostPort("", proxyPort))
if err != nil {
return nil
}
conn.Close()
time.Sleep(1 * time.Millisecond)
}
return fmt.Errorf("port %s still open", proxyPort)
return fmt.Errorf("port %d still open", proxyPort)
}
func waitForClosedPortUDP(p *Proxier, proxyPort string) error {
func waitForClosedPortUDP(p *Proxier, proxyPort int) error {
for i := 0; i < 50; i++ {
conn, err := net.Dial("udp", net.JoinHostPort("127.0.0.1", proxyPort))
conn, err := net.Dial("udp", joinHostPort("", proxyPort))
if err != nil {
return nil
}
@ -66,7 +71,26 @@ func waitForClosedPortUDP(p *Proxier, proxyPort string) error {
conn.Close()
time.Sleep(1 * time.Millisecond)
}
return fmt.Errorf("port %s still open", proxyPort)
return fmt.Errorf("port %d still open", proxyPort)
}
// The iptables logic has to be tested in a proper end-to-end test, so this just stubs everything out.
type fakeIptables struct{}
func (fake *fakeIptables) EnsureChain(table iptables.Table, chain iptables.Chain) (bool, error) {
return false, nil
}
func (fake *fakeIptables) FlushChain(table iptables.Table, chain iptables.Chain) error {
return nil
}
func (fake *fakeIptables) EnsureRule(table iptables.Table, chain iptables.Chain, args ...string) (bool, error) {
return false, nil
}
func (fake *fakeIptables) DeleteRule(table iptables.Table, chain iptables.Chain, args ...string) error {
return nil
}
var tcpServerPort string
@ -99,9 +123,9 @@ func init() {
go udp.Loop()
}
func testEchoTCP(t *testing.T, address, port string) {
func testEchoTCP(t *testing.T, address string, port int) {
path := "aaaaa"
res, err := http.Get("http://" + address + ":" + port + "/" + path)
res, err := http.Get("http://" + address + ":" + fmt.Sprintf("%d", port) + "/" + path)
if err != nil {
t.Fatalf("error connecting to server: %v", err)
}
@ -115,10 +139,10 @@ func testEchoTCP(t *testing.T, address, port string) {
}
}
func testEchoUDP(t *testing.T, address, port string) {
func testEchoUDP(t *testing.T, address string, port int) {
data := "abc123"
conn, err := net.Dial("udp", net.JoinHostPort(address, port))
conn, err := net.Dial("udp", joinHostPort(address, port))
if err != nil {
t.Fatalf("error connecting to server: %v", err)
}
@ -144,13 +168,13 @@ func TestTCPProxy(t *testing.T) {
},
})
p := NewProxier(lb, net.ParseIP("127.0.0.1"))
p := NewProxier(lb, net.ParseIP("127.0.0.1"), &fakeIptables{})
proxyPort, err := p.addServiceOnUnusedPort("echo", "TCP", 0)
svcInfo, err := p.addServiceOnPort("echo", "TCP", 0, time.Second)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
testEchoTCP(t, "127.0.0.1", proxyPort)
testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort)
}
func TestUDPProxy(t *testing.T) {
@ -162,13 +186,13 @@ func TestUDPProxy(t *testing.T) {
},
})
p := NewProxier(lb, net.ParseIP("127.0.0.1"))
p := NewProxier(lb, net.ParseIP("127.0.0.1"), &fakeIptables{})
proxyPort, err := p.addServiceOnUnusedPort("echo", "UDP", time.Second)
svcInfo, err := p.addServiceOnPort("echo", "UDP", 0, time.Second)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
testEchoUDP(t, "127.0.0.1", proxyPort)
testEchoUDP(t, "127.0.0.1", svcInfo.proxyPort)
}
// Helper: Stops the proxy for the named service.
@ -189,13 +213,13 @@ func TestTCPProxyStop(t *testing.T) {
},
})
p := NewProxier(lb, net.ParseIP("127.0.0.1"))
p := NewProxier(lb, net.ParseIP("127.0.0.1"), &fakeIptables{})
proxyPort, err := p.addServiceOnUnusedPort("echo", "TCP", 0)
svcInfo, err := p.addServiceOnPort("echo", "TCP", 0, time.Second)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
conn, err := net.Dial("tcp", net.JoinHostPort("127.0.0.1", proxyPort))
conn, err := net.Dial("tcp", joinHostPort("", svcInfo.proxyPort))
if err != nil {
t.Fatalf("error connecting to proxy: %v", err)
}
@ -203,7 +227,7 @@ func TestTCPProxyStop(t *testing.T) {
stopProxyByName(p, "echo")
// Wait for the port to really close.
if err := waitForClosedPortTCP(p, proxyPort); err != nil {
if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil {
t.Fatalf(err.Error())
}
}
@ -217,13 +241,13 @@ func TestUDPProxyStop(t *testing.T) {
},
})
p := NewProxier(lb, net.ParseIP("127.0.0.1"))
p := NewProxier(lb, net.ParseIP("127.0.0.1"), &fakeIptables{})
proxyPort, err := p.addServiceOnUnusedPort("echo", "UDP", time.Second)
svcInfo, err := p.addServiceOnPort("echo", "UDP", 0, time.Second)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
conn, err := net.Dial("udp", net.JoinHostPort("127.0.0.1", proxyPort))
conn, err := net.Dial("udp", joinHostPort("", svcInfo.proxyPort))
if err != nil {
t.Fatalf("error connecting to proxy: %v", err)
}
@ -231,7 +255,7 @@ func TestUDPProxyStop(t *testing.T) {
stopProxyByName(p, "echo")
// Wait for the port to really close.
if err := waitForClosedPortUDP(p, proxyPort); err != nil {
if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil {
t.Fatalf(err.Error())
}
}
@ -245,20 +269,20 @@ func TestTCPProxyUpdateDelete(t *testing.T) {
},
})
p := NewProxier(lb, net.ParseIP("127.0.0.1"))
p := NewProxier(lb, net.ParseIP("127.0.0.1"), &fakeIptables{})
proxyPort, err := p.addServiceOnUnusedPort("echo", "TCP", 0)
svcInfo, err := p.addServiceOnPort("echo", "TCP", 0, time.Second)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
conn, err := net.Dial("tcp", net.JoinHostPort("127.0.0.1", proxyPort))
conn, err := net.Dial("tcp", joinHostPort("", svcInfo.proxyPort))
if err != nil {
t.Fatalf("error connecting to proxy: %v", err)
}
conn.Close()
p.OnUpdate([]api.Service{})
if err := waitForClosedPortTCP(p, proxyPort); err != nil {
if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil {
t.Fatalf(err.Error())
}
}
@ -272,20 +296,20 @@ func TestUDPProxyUpdateDelete(t *testing.T) {
},
})
p := NewProxier(lb, net.ParseIP("127.0.0.1"))
p := NewProxier(lb, net.ParseIP("127.0.0.1"), &fakeIptables{})
proxyPort, err := p.addServiceOnUnusedPort("echo", "UDP", time.Second)
svcInfo, err := p.addServiceOnPort("echo", "UDP", 0, time.Second)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
conn, err := net.Dial("udp", net.JoinHostPort("127.0.0.1", proxyPort))
conn, err := net.Dial("udp", joinHostPort("", svcInfo.proxyPort))
if err != nil {
t.Fatalf("error connecting to proxy: %v", err)
}
conn.Close()
p.OnUpdate([]api.Service{})
if err := waitForClosedPortUDP(p, proxyPort); err != nil {
if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil {
t.Fatalf(err.Error())
}
}
@ -299,27 +323,26 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
},
})
p := NewProxier(lb, net.ParseIP("127.0.0.1"))
p := NewProxier(lb, net.ParseIP("127.0.0.1"), &fakeIptables{})
proxyPort, err := p.addServiceOnUnusedPort("echo", "TCP", 0)
svcInfo, err := p.addServiceOnPort("echo", "TCP", 0, time.Second)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
conn, err := net.Dial("tcp", net.JoinHostPort("127.0.0.1", proxyPort))
conn, err := net.Dial("tcp", joinHostPort("", svcInfo.proxyPort))
if err != nil {
t.Fatalf("error connecting to proxy: %v", err)
}
conn.Close()
p.OnUpdate([]api.Service{})
if err := waitForClosedPortTCP(p, proxyPort); err != nil {
if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil {
t.Fatalf(err.Error())
}
proxyPortNum, _ := strconv.Atoi(proxyPort)
p.OnUpdate([]api.Service{
{TypeMeta: api.TypeMeta{ID: "echo"}, Port: proxyPortNum, Protocol: "TCP"},
{TypeMeta: api.TypeMeta{ID: "echo"}, Port: svcInfo.proxyPort, ProxyPort: svcInfo.proxyPort, Protocol: "TCP"},
})
testEchoTCP(t, "127.0.0.1", proxyPort)
testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort)
}
func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
@ -331,27 +354,26 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
},
})
p := NewProxier(lb, net.ParseIP("127.0.0.1"))
p := NewProxier(lb, net.ParseIP("127.0.0.1"), &fakeIptables{})
proxyPort, err := p.addServiceOnUnusedPort("echo", "UDP", time.Second)
svcInfo, err := p.addServiceOnPort("echo", "UDP", 0, time.Second)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
conn, err := net.Dial("udp", net.JoinHostPort("127.0.0.1", proxyPort))
conn, err := net.Dial("udp", joinHostPort("", svcInfo.proxyPort))
if err != nil {
t.Fatalf("error connecting to proxy: %v", err)
}
conn.Close()
p.OnUpdate([]api.Service{})
if err := waitForClosedPortUDP(p, proxyPort); err != nil {
if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil {
t.Fatalf(err.Error())
}
proxyPortNum, _ := strconv.Atoi(proxyPort)
p.OnUpdate([]api.Service{
{TypeMeta: api.TypeMeta{ID: "echo"}, Port: proxyPortNum, Protocol: "UDP"},
{TypeMeta: api.TypeMeta{ID: "echo"}, Port: svcInfo.proxyPort, ProxyPort: svcInfo.proxyPort, Protocol: "UDP"},
})
testEchoUDP(t, "127.0.0.1", proxyPort)
testEchoUDP(t, "127.0.0.1", svcInfo.proxyPort)
}
func TestTCPProxyUpdatePort(t *testing.T) {
@ -363,36 +385,36 @@ func TestTCPProxyUpdatePort(t *testing.T) {
},
})
p := NewProxier(lb, net.ParseIP("127.0.0.1"))
p := NewProxier(lb, net.ParseIP("127.0.0.1"), &fakeIptables{})
proxyPort, err := p.addServiceOnUnusedPort("echo", "TCP", 0)
svcInfo, err := p.addServiceOnPort("echo", "TCP", 0, time.Second)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
// add a new dummy listener in order to get a port that is free
l, _ := net.Listen("tcp", ":0")
_, newPort, _ := net.SplitHostPort(l.Addr().String())
newPortNum, _ := strconv.Atoi(newPort)
_, newPortStr, _ := net.SplitHostPort(l.Addr().String())
newPort, _ := strconv.Atoi(newPortStr)
l.Close()
// Wait for the socket to actually get free.
if err := waitForClosedPortTCP(p, newPort); err != nil {
t.Fatalf(err.Error())
}
if proxyPort == newPort {
t.Errorf("expected difference, got %s %s", newPort, proxyPort)
if svcInfo.proxyPort == newPort {
t.Errorf("expected difference, got %d %d", newPort, svcInfo.proxyPort)
}
p.OnUpdate([]api.Service{
{TypeMeta: api.TypeMeta{ID: "echo"}, Port: newPortNum, Protocol: "TCP"},
{TypeMeta: api.TypeMeta{ID: "echo"}, Port: newPort, ProxyPort: newPort, Protocol: "TCP"},
})
if err := waitForClosedPortTCP(p, proxyPort); err != nil {
if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil {
t.Fatalf(err.Error())
}
testEchoTCP(t, "127.0.0.1", newPort)
// Ensure the old port is released and re-usable.
l, err = net.Listen("tcp", net.JoinHostPort("", proxyPort))
l, err = net.Listen("tcp", joinHostPort("", svcInfo.proxyPort))
if err != nil {
t.Fatalf("can't claim released port: %s", err)
}
@ -408,36 +430,36 @@ func TestUDPProxyUpdatePort(t *testing.T) {
},
})
p := NewProxier(lb, net.ParseIP("127.0.0.1"))
p := NewProxier(lb, net.ParseIP("127.0.0.1"), &fakeIptables{})
proxyPort, err := p.addServiceOnUnusedPort("echo", "UDP", time.Second)
svcInfo, err := p.addServiceOnPort("echo", "UDP", 0, time.Second)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
// add a new dummy listener in order to get a port that is free
pc, _ := net.ListenPacket("udp", ":0")
_, newPort, _ := net.SplitHostPort(pc.LocalAddr().String())
newPortNum, _ := strconv.Atoi(newPort)
_, newPortStr, _ := net.SplitHostPort(pc.LocalAddr().String())
newPort, _ := strconv.Atoi(newPortStr)
pc.Close()
// Wait for the socket to actually get free.
if err := waitForClosedPortUDP(p, newPort); err != nil {
t.Fatalf(err.Error())
}
if proxyPort == newPort {
t.Errorf("expected difference, got %s %s", newPort, proxyPort)
if svcInfo.proxyPort == newPort {
t.Errorf("expected difference, got %d %d", newPort, svcInfo.proxyPort)
}
p.OnUpdate([]api.Service{
{TypeMeta: api.TypeMeta{ID: "echo"}, Port: newPortNum, Protocol: "UDP"},
{TypeMeta: api.TypeMeta{ID: "echo"}, Port: newPort, ProxyPort: newPort, Protocol: "UDP"},
})
if err := waitForClosedPortUDP(p, proxyPort); err != nil {
if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil {
t.Fatalf(err.Error())
}
testEchoUDP(t, "127.0.0.1", newPort)
// Ensure the old port is released and re-usable.
pc, err = net.ListenPacket("udp", net.JoinHostPort("", proxyPort))
pc, err = net.ListenPacket("udp", joinHostPort("", svcInfo.proxyPort))
if err != nil {
t.Fatalf("can't claim released port: %s", err)
}

View File

@ -48,10 +48,8 @@ func TestMakeManifestNoServices(t *testing.T) {
}
container := manifest.Containers[0]
if len(container.Env) != 1 ||
container.Env[0].Name != "SERVICE_HOST" ||
container.Env[0].Value != "machine" {
t.Errorf("Expected one env vars, got: %#v", manifest)
if len(container.Env) != 0 {
t.Errorf("Expected zero env vars, got: %#v", manifest)
}
if manifest.ID != "foobar" {
t.Errorf("Failed to assign ID to manifest: %#v", manifest.ID)
@ -69,6 +67,7 @@ func TestMakeManifestServices(t *testing.T) {
Kind: util.IntstrInt,
IntVal: 900,
},
PortalIP: "1.2.3.4",
},
},
},
@ -96,7 +95,7 @@ func TestMakeManifestServices(t *testing.T) {
envs := []api.EnvVar{
{
Name: "TEST_SERVICE_HOST",
Value: "machine",
Value: "1.2.3.4",
},
{
Name: "TEST_SERVICE_PORT",
@ -104,11 +103,11 @@ func TestMakeManifestServices(t *testing.T) {
},
{
Name: "TEST_PORT",
Value: "tcp://machine:8080",
Value: "tcp://1.2.3.4:8080",
},
{
Name: "TEST_PORT_8080_TCP",
Value: "tcp://machine:8080",
Value: "tcp://1.2.3.4:8080",
},
{
Name: "TEST_PORT_8080_TCP_PROTO",
@ -120,11 +119,7 @@ func TestMakeManifestServices(t *testing.T) {
},
{
Name: "TEST_PORT_8080_TCP_ADDR",
Value: "machine",
},
{
Name: "SERVICE_HOST",
Value: "machine",
Value: "1.2.3.4",
},
}
if len(container.Env) != len(envs) {
@ -149,6 +144,7 @@ func TestMakeManifestServicesExistingEnvVar(t *testing.T) {
Kind: util.IntstrInt,
IntVal: 900,
},
PortalIP: "1.2.3.4",
},
},
},
@ -186,7 +182,7 @@ func TestMakeManifestServicesExistingEnvVar(t *testing.T) {
},
{
Name: "TEST_SERVICE_HOST",
Value: "machine",
Value: "1.2.3.4",
},
{
Name: "TEST_SERVICE_PORT",
@ -194,11 +190,11 @@ func TestMakeManifestServicesExistingEnvVar(t *testing.T) {
},
{
Name: "TEST_PORT",
Value: "tcp://machine:8080",
Value: "tcp://1.2.3.4:8080",
},
{
Name: "TEST_PORT_8080_TCP",
Value: "tcp://machine:8080",
Value: "tcp://1.2.3.4:8080",
},
{
Name: "TEST_PORT_8080_TCP_PROTO",
@ -210,11 +206,7 @@ func TestMakeManifestServicesExistingEnvVar(t *testing.T) {
},
{
Name: "TEST_PORT_8080_TCP_ADDR",
Value: "machine",
},
{
Name: "SERVICE_HOST",
Value: "machine",
Value: "1.2.3.4",
},
}
if len(container.Env) != len(envs) {

View File

@ -17,6 +17,8 @@ limitations under the License.
package registrytest
import (
"sync"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
@ -27,6 +29,7 @@ func NewServiceRegistry() *ServiceRegistry {
}
type ServiceRegistry struct {
mu sync.Mutex
List api.ServiceList
Service *api.Service
Err error
@ -39,48 +42,84 @@ type ServiceRegistry struct {
}
func (r *ServiceRegistry) ListServices(ctx api.Context) (*api.ServiceList, error) {
return &r.List, r.Err
r.mu.Lock()
defer r.mu.Unlock()
// Return by copy to avoid data races
res := new(api.ServiceList)
*res = r.List
res.Items = append([]api.Service{}, r.List.Items...)
return res, r.Err
}
func (r *ServiceRegistry) CreateService(ctx api.Context, svc *api.Service) error {
r.mu.Lock()
defer r.mu.Unlock()
r.Service = svc
r.List.Items = append(r.List.Items, *svc)
return r.Err
}
func (r *ServiceRegistry) GetService(ctx api.Context, id string) (*api.Service, error) {
r.mu.Lock()
defer r.mu.Unlock()
r.GottenID = id
return r.Service, r.Err
}
func (r *ServiceRegistry) DeleteService(ctx api.Context, id string) error {
r.mu.Lock()
defer r.mu.Unlock()
r.DeletedID = id
r.Service = nil
return r.Err
}
func (r *ServiceRegistry) UpdateService(ctx api.Context, svc *api.Service) error {
r.mu.Lock()
defer r.mu.Unlock()
r.UpdatedID = svc.ID
r.Service = svc
return r.Err
}
func (r *ServiceRegistry) WatchServices(ctx api.Context, label labels.Selector, field labels.Selector, resourceVersion string) (watch.Interface, error) {
r.mu.Lock()
defer r.mu.Unlock()
return nil, r.Err
}
func (r *ServiceRegistry) ListEndpoints(ctx api.Context) (*api.EndpointsList, error) {
r.mu.Lock()
defer r.mu.Unlock()
return &r.EndpointsList, r.Err
}
func (r *ServiceRegistry) GetEndpoints(ctx api.Context, id string) (*api.Endpoints, error) {
r.mu.Lock()
defer r.mu.Unlock()
r.GottenID = id
return &r.Endpoints, r.Err
}
func (r *ServiceRegistry) UpdateEndpoints(ctx api.Context, e *api.Endpoints) error {
r.mu.Lock()
defer r.mu.Unlock()
r.Endpoints = *e
return r.Err
}
func (r *ServiceRegistry) WatchEndpoints(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) {
r.mu.Lock()
defer r.mu.Unlock()
return nil, r.Err
}

View File

@ -32,7 +32,6 @@ type ipAllocator struct {
}
// newIPAllocator creates and intializes a new ipAllocator object.
// FIXME: resync from storage at startup.
func newIPAllocator(subnet *net.IPNet) *ipAllocator {
if subnet == nil || subnet.IP == nil || subnet.Mask == nil {
return nil

View File

@ -19,6 +19,7 @@ package service
import (
"fmt"
"math/rand"
"net"
"strconv"
"strings"
@ -32,21 +33,49 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"github.com/golang/glog"
)
// REST adapts a service registry into apiserver's RESTStorage model.
type REST struct {
registry Registry
cloud cloudprovider.Interface
machines minion.Registry
registry Registry
cloud cloudprovider.Interface
machines minion.Registry
portalMgr *ipAllocator
}
// NewREST returns a new REST.
func NewREST(registry Registry, cloud cloudprovider.Interface, machines minion.Registry) *REST {
func NewREST(registry Registry, cloud cloudprovider.Interface, machines minion.Registry, portalNet *net.IPNet) *REST {
// TODO: Before we can replicate masters, this has to be synced (e.g. lives in etcd)
ipa := newIPAllocator(portalNet)
reloadIPsFromStorage(ipa, registry)
return &REST{
registry: registry,
cloud: cloud,
machines: machines,
registry: registry,
cloud: cloud,
machines: machines,
portalMgr: ipa,
}
}
// Helper: mark all previously allocated IPs in the allocator.
func reloadIPsFromStorage(ipa *ipAllocator, registry Registry) {
services, err := registry.ListServices(api.NewContext())
if err != nil {
// This is really bad.
glog.Errorf("can't list services to init service REST: %s", err)
return
}
for i := range services.Items {
s := &services.Items[i]
if s.PortalIP == "" {
glog.Warningf("service %q has no PortalIP", s.ID)
continue
}
if err := ipa.Allocate(net.ParseIP(s.PortalIP)); err != nil {
// This is really bad.
glog.Errorf("service %q PortalIP %s could not be allocated: %s", s.ID, s.PortalIP, err)
}
}
}
@ -61,9 +90,16 @@ func (rs *REST) Create(ctx api.Context, obj runtime.Object) (<-chan runtime.Obje
srv.CreationTimestamp = util.Now()
if ip, err := rs.portalMgr.AllocateNext(); err != nil {
return nil, err
} else {
srv.PortalIP = ip.String()
}
return apiserver.MakeAsync(func() (runtime.Object, error) {
// TODO: Consider moving this to a rectification loop, so that we make/remove external load balancers
// correctly no matter what http operations happen.
srv.ProxyPort = 0
if srv.CreateExternalLoadBalancer {
if rs.cloud == nil {
return nil, fmt.Errorf("requested an external service, but no cloud provider supplied.")
@ -88,6 +124,9 @@ func (rs *REST) Create(ctx api.Context, obj runtime.Object) (<-chan runtime.Obje
if err != nil {
return nil, err
}
// External load-balancers require a known port for the service proxy.
// TODO: If we end up brokering HostPorts between Pods and Services, this can be any port.
srv.ProxyPort = srv.Port
}
err := rs.registry.CreateService(ctx, srv)
if err != nil {
@ -110,6 +149,7 @@ func (rs *REST) Delete(ctx api.Context, id string) (<-chan runtime.Object, error
if err != nil {
return nil, err
}
rs.portalMgr.Release(net.ParseIP(service.PortalIP))
return apiserver.MakeAsync(func() (runtime.Object, error) {
rs.deleteExternalLoadBalancer(service)
return &api.Status{Status: api.StatusSuccess}, rs.registry.DeleteService(ctx, id)
@ -161,16 +201,13 @@ func GetServiceEnvironmentVariables(ctx api.Context, registry Registry, machine
for _, service := range services.Items {
// Host
name := makeEnvVariableName(service.ID) + "_SERVICE_HOST"
result = append(result, api.EnvVar{Name: name, Value: machine})
result = append(result, api.EnvVar{Name: name, Value: service.PortalIP})
// Port
name = makeEnvVariableName(service.ID) + "_SERVICE_PORT"
result = append(result, api.EnvVar{Name: name, Value: strconv.Itoa(service.Port)})
// Docker-compatible vars.
result = append(result, makeLinkVariables(service, machine)...)
result = append(result, makeLinkVariables(service)...)
}
// The 'SERVICE_HOST' variable is deprecated.
// TODO(thockin): get rid of it once ip-per-service is in and "deployed".
result = append(result, api.EnvVar{Name: "SERVICE_HOST", Value: machine})
return result, nil
}
@ -183,8 +220,15 @@ func (rs *REST) Update(ctx api.Context, obj runtime.Object) (<-chan runtime.Obje
return nil, errors.NewInvalid("service", srv.ID, errs)
}
return apiserver.MakeAsync(func() (runtime.Object, error) {
cur, err := rs.registry.GetService(ctx, srv.ID)
if err != nil {
return nil, err
}
// Copy over non-user fields.
srv.PortalIP = cur.PortalIP
srv.ProxyPort = cur.ProxyPort
// TODO: check to see if external load balancer status changed
err := rs.registry.UpdateService(ctx, srv)
err = rs.registry.UpdateService(ctx, srv)
if err != nil {
return nil, err
}
@ -234,7 +278,7 @@ func makeEnvVariableName(str string) string {
return strings.ToUpper(strings.Replace(str, "-", "_", -1))
}
func makeLinkVariables(service api.Service, machine string) []api.EnvVar {
func makeLinkVariables(service api.Service) []api.EnvVar {
prefix := makeEnvVariableName(service.ID)
protocol := string(api.ProtocolTCP)
if service.Protocol != "" {
@ -244,11 +288,11 @@ func makeLinkVariables(service api.Service, machine string) []api.EnvVar {
return []api.EnvVar{
{
Name: prefix + "_PORT",
Value: fmt.Sprintf("%s://%s:%d", strings.ToLower(protocol), machine, service.Port),
Value: fmt.Sprintf("%s://%s:%d", strings.ToLower(protocol), service.PortalIP, service.Port),
},
{
Name: portPrefix,
Value: fmt.Sprintf("%s://%s:%d", strings.ToLower(protocol), machine, service.Port),
Value: fmt.Sprintf("%s://%s:%d", strings.ToLower(protocol), service.PortalIP, service.Port),
},
{
Name: portPrefix + "_PROTO",
@ -260,7 +304,7 @@ func makeLinkVariables(service api.Service, machine string) []api.EnvVar {
},
{
Name: portPrefix + "_ADDR",
Value: machine,
Value: service.PortalIP,
},
}
}

View File

@ -18,6 +18,7 @@ package service
import (
"fmt"
"net"
"reflect"
"testing"
@ -29,11 +30,19 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest"
)
func makeIPNet(t *testing.T) *net.IPNet {
_, net, err := net.ParseCIDR("1.2.3.0/24")
if err != nil {
t.Error(err)
}
return net
}
func TestServiceRegistryCreate(t *testing.T) {
registry := registrytest.NewServiceRegistry()
fakeCloud := &cloud.FakeCloud{}
machines := []string{"foo", "bar", "baz"}
storage := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}))
storage := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t))
svc := &api.Service{
Port: 6502,
TypeMeta: api.TypeMeta{ID: "foo"},
@ -49,6 +58,12 @@ func TestServiceRegistryCreate(t *testing.T) {
if created_service.CreationTimestamp.IsZero() {
t.Errorf("Expected timestamp to be set, got: %v", created_service.CreationTimestamp)
}
if created_service.PortalIP != "1.2.3.1" {
t.Errorf("Unexpected PortalIP: %s", created_service.PortalIP)
}
if created_service.ProxyPort != 0 {
t.Errorf("Unexpected ProxyPort: %d", created_service.ProxyPort)
}
if len(fakeCloud.Calls) != 0 {
t.Errorf("Unexpected call(s): %#v", fakeCloud.Calls)
}
@ -63,7 +78,7 @@ func TestServiceRegistryCreate(t *testing.T) {
func TestServiceStorageValidatesCreate(t *testing.T) {
registry := registrytest.NewServiceRegistry()
storage := NewREST(registry, nil, nil)
storage := NewREST(registry, nil, nil, makeIPNet(t))
failureCases := map[string]api.Service{
"empty ID": {
Port: 6502,
@ -96,7 +111,7 @@ func TestServiceRegistryUpdate(t *testing.T) {
TypeMeta: api.TypeMeta{ID: "foo"},
Selector: map[string]string{"bar": "baz1"},
})
storage := NewREST(registry, nil, nil)
storage := NewREST(registry, nil, nil, makeIPNet(t))
c, err := storage.Update(ctx, &api.Service{
Port: 6502,
TypeMeta: api.TypeMeta{ID: "foo"},
@ -126,7 +141,7 @@ func TestServiceStorageValidatesUpdate(t *testing.T) {
TypeMeta: api.TypeMeta{ID: "foo"},
Selector: map[string]string{"bar": "baz"},
})
storage := NewREST(registry, nil, nil)
storage := NewREST(registry, nil, nil, makeIPNet(t))
failureCases := map[string]api.Service{
"empty ID": {
Port: 6502,
@ -155,7 +170,7 @@ func TestServiceRegistryExternalService(t *testing.T) {
registry := registrytest.NewServiceRegistry()
fakeCloud := &cloud.FakeCloud{}
machines := []string{"foo", "bar", "baz"}
storage := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}))
storage := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t))
svc := &api.Service{
Port: 6502,
TypeMeta: api.TypeMeta{ID: "foo"},
@ -182,7 +197,7 @@ func TestServiceRegistryExternalServiceError(t *testing.T) {
Err: fmt.Errorf("test error"),
}
machines := []string{"foo", "bar", "baz"}
storage := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}))
storage := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t))
svc := &api.Service{
Port: 6502,
TypeMeta: api.TypeMeta{ID: "foo"},
@ -205,7 +220,7 @@ func TestServiceRegistryDelete(t *testing.T) {
registry := registrytest.NewServiceRegistry()
fakeCloud := &cloud.FakeCloud{}
machines := []string{"foo", "bar", "baz"}
storage := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}))
storage := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t))
svc := &api.Service{
TypeMeta: api.TypeMeta{ID: "foo"},
Selector: map[string]string{"bar": "baz"},
@ -226,7 +241,7 @@ func TestServiceRegistryDeleteExternal(t *testing.T) {
registry := registrytest.NewServiceRegistry()
fakeCloud := &cloud.FakeCloud{}
machines := []string{"foo", "bar", "baz"}
storage := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}))
storage := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t))
svc := &api.Service{
TypeMeta: api.TypeMeta{ID: "foo"},
Selector: map[string]string{"bar": "baz"},
@ -253,18 +268,21 @@ func TestServiceRegistryMakeLinkVariables(t *testing.T) {
Selector: map[string]string{"bar": "baz"},
Port: 8080,
Protocol: "TCP",
PortalIP: "1.2.3.4",
},
{
TypeMeta: api.TypeMeta{ID: "abc-123"},
Selector: map[string]string{"bar": "baz"},
Port: 8081,
Protocol: "UDP",
PortalIP: "5.6.7.8",
},
{
TypeMeta: api.TypeMeta{ID: "q-u-u-x"},
Selector: map[string]string{"bar": "baz"},
Port: 8082,
Protocol: "",
PortalIP: "9.8.7.6",
},
},
}
@ -274,28 +292,27 @@ func TestServiceRegistryMakeLinkVariables(t *testing.T) {
t.Errorf("Unexpected err: %v", err)
}
expected := []api.EnvVar{
{Name: "FOO_BAR_SERVICE_HOST", Value: "machine"},
{Name: "FOO_BAR_SERVICE_HOST", Value: "1.2.3.4"},
{Name: "FOO_BAR_SERVICE_PORT", Value: "8080"},
{Name: "FOO_BAR_PORT", Value: "tcp://machine:8080"},
{Name: "FOO_BAR_PORT_8080_TCP", Value: "tcp://machine:8080"},
{Name: "FOO_BAR_PORT", Value: "tcp://1.2.3.4:8080"},
{Name: "FOO_BAR_PORT_8080_TCP", Value: "tcp://1.2.3.4:8080"},
{Name: "FOO_BAR_PORT_8080_TCP_PROTO", Value: "tcp"},
{Name: "FOO_BAR_PORT_8080_TCP_PORT", Value: "8080"},
{Name: "FOO_BAR_PORT_8080_TCP_ADDR", Value: "machine"},
{Name: "ABC_123_SERVICE_HOST", Value: "machine"},
{Name: "FOO_BAR_PORT_8080_TCP_ADDR", Value: "1.2.3.4"},
{Name: "ABC_123_SERVICE_HOST", Value: "5.6.7.8"},
{Name: "ABC_123_SERVICE_PORT", Value: "8081"},
{Name: "ABC_123_PORT", Value: "udp://machine:8081"},
{Name: "ABC_123_PORT_8081_UDP", Value: "udp://machine:8081"},
{Name: "ABC_123_PORT", Value: "udp://5.6.7.8:8081"},
{Name: "ABC_123_PORT_8081_UDP", Value: "udp://5.6.7.8:8081"},
{Name: "ABC_123_PORT_8081_UDP_PROTO", Value: "udp"},
{Name: "ABC_123_PORT_8081_UDP_PORT", Value: "8081"},
{Name: "ABC_123_PORT_8081_UDP_ADDR", Value: "machine"},
{Name: "Q_U_U_X_SERVICE_HOST", Value: "machine"},
{Name: "ABC_123_PORT_8081_UDP_ADDR", Value: "5.6.7.8"},
{Name: "Q_U_U_X_SERVICE_HOST", Value: "9.8.7.6"},
{Name: "Q_U_U_X_SERVICE_PORT", Value: "8082"},
{Name: "Q_U_U_X_PORT", Value: "tcp://machine:8082"},
{Name: "Q_U_U_X_PORT_8082_TCP", Value: "tcp://machine:8082"},
{Name: "Q_U_U_X_PORT", Value: "tcp://9.8.7.6:8082"},
{Name: "Q_U_U_X_PORT_8082_TCP", Value: "tcp://9.8.7.6:8082"},
{Name: "Q_U_U_X_PORT_8082_TCP_PROTO", Value: "tcp"},
{Name: "Q_U_U_X_PORT_8082_TCP_PORT", Value: "8082"},
{Name: "Q_U_U_X_PORT_8082_TCP_ADDR", Value: "machine"},
{Name: "SERVICE_HOST", Value: "machine"},
{Name: "Q_U_U_X_PORT_8082_TCP_ADDR", Value: "9.8.7.6"},
}
if len(vars) != len(expected) {
t.Errorf("Expected %d env vars, got: %+v", len(expected), vars)
@ -313,7 +330,7 @@ func TestServiceRegistryGet(t *testing.T) {
registry := registrytest.NewServiceRegistry()
fakeCloud := &cloud.FakeCloud{}
machines := []string{"foo", "bar", "baz"}
storage := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}))
storage := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t))
registry.CreateService(ctx, &api.Service{
TypeMeta: api.TypeMeta{ID: "foo"},
Selector: map[string]string{"bar": "baz"},
@ -333,7 +350,7 @@ func TestServiceRegistryResourceLocation(t *testing.T) {
registry.Endpoints = api.Endpoints{Endpoints: []string{"foo:80"}}
fakeCloud := &cloud.FakeCloud{}
machines := []string{"foo", "bar", "baz"}
storage := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}))
storage := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t))
registry.CreateService(ctx, &api.Service{
TypeMeta: api.TypeMeta{ID: "foo"},
Selector: map[string]string{"bar": "baz"},
@ -362,7 +379,7 @@ func TestServiceRegistryList(t *testing.T) {
registry := registrytest.NewServiceRegistry()
fakeCloud := &cloud.FakeCloud{}
machines := []string{"foo", "bar", "baz"}
storage := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}))
storage := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t))
registry.CreateService(ctx, &api.Service{
TypeMeta: api.TypeMeta{ID: "foo"},
Selector: map[string]string{"bar": "baz"},
@ -390,3 +407,194 @@ func TestServiceRegistryList(t *testing.T) {
t.Errorf("Unexpected resource version: %#v", sl)
}
}
func TestServiceRegistryIPAllocation(t *testing.T) {
registry := registrytest.NewServiceRegistry()
fakeCloud := &cloud.FakeCloud{}
machines := []string{"foo", "bar", "baz"}
rest := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t))
svc1 := &api.Service{
Port: 6502,
TypeMeta: api.TypeMeta{ID: "foo"},
Selector: map[string]string{"bar": "baz"},
}
ctx := api.NewDefaultContext()
c1, _ := rest.Create(ctx, svc1)
created_svc1 := <-c1
created_service_1 := created_svc1.(*api.Service)
if created_service_1.ID != "foo" {
t.Errorf("Expected foo, but got %v", created_service_1.ID)
}
if created_service_1.PortalIP != "1.2.3.1" {
t.Errorf("Unexpected PortalIP: %s", created_service_1.PortalIP)
}
svc2 := &api.Service{
Port: 6502,
TypeMeta: api.TypeMeta{ID: "bar"},
Selector: map[string]string{"bar": "baz"},
}
ctx = api.NewDefaultContext()
c2, _ := rest.Create(ctx, svc2)
created_svc2 := <-c2
created_service_2 := created_svc2.(*api.Service)
if created_service_2.ID != "bar" {
t.Errorf("Expected bar, but got %v", created_service_2.ID)
}
if created_service_2.PortalIP != "1.2.3.2" { // new IP
t.Errorf("Unexpected PortalIP: %s", created_service_2.PortalIP)
}
}
func TestServiceRegistryIPReallocation(t *testing.T) {
registry := registrytest.NewServiceRegistry()
fakeCloud := &cloud.FakeCloud{}
machines := []string{"foo", "bar", "baz"}
rest := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t))
svc1 := &api.Service{
Port: 6502,
TypeMeta: api.TypeMeta{ID: "foo"},
Selector: map[string]string{"bar": "baz"},
}
ctx := api.NewDefaultContext()
c1, _ := rest.Create(ctx, svc1)
created_svc1 := <-c1
created_service_1 := created_svc1.(*api.Service)
if created_service_1.ID != "foo" {
t.Errorf("Expected foo, but got %v", created_service_1.ID)
}
if created_service_1.PortalIP != "1.2.3.1" {
t.Errorf("Unexpected PortalIP: %s", created_service_1.PortalIP)
}
c, _ := rest.Delete(ctx, created_service_1.ID)
<-c
svc2 := &api.Service{
Port: 6502,
TypeMeta: api.TypeMeta{ID: "bar"},
Selector: map[string]string{"bar": "baz"},
}
ctx = api.NewDefaultContext()
c2, _ := rest.Create(ctx, svc2)
created_svc2 := <-c2
created_service_2 := created_svc2.(*api.Service)
if created_service_2.ID != "bar" {
t.Errorf("Expected bar, but got %v", created_service_2.ID)
}
if created_service_2.PortalIP != "1.2.3.1" { // same IP as before
t.Errorf("Unexpected PortalIP: %s", created_service_2.PortalIP)
}
}
func TestServiceRegistryIPUpdate(t *testing.T) {
registry := registrytest.NewServiceRegistry()
fakeCloud := &cloud.FakeCloud{}
machines := []string{"foo", "bar", "baz"}
rest := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t))
svc := &api.Service{
Port: 6502,
TypeMeta: api.TypeMeta{ID: "foo"},
Selector: map[string]string{"bar": "baz"},
}
ctx := api.NewDefaultContext()
c, _ := rest.Create(ctx, svc)
created_svc := <-c
created_service := created_svc.(*api.Service)
if created_service.Port != 6502 {
t.Errorf("Expected port 6502, but got %v", created_service.Port)
}
if created_service.PortalIP != "1.2.3.1" {
t.Errorf("Unexpected PortalIP: %s", created_service.PortalIP)
}
if created_service.ProxyPort != 0 {
t.Errorf("Unexpected ProxyPort: %d", created_service.ProxyPort)
}
update := new(api.Service)
*update = *created_service
update.Port = 6503
update.PortalIP = "8.6.7.5"
update.ProxyPort = 309
c, _ = rest.Update(ctx, update)
updated_svc := <-c
updated_service := updated_svc.(*api.Service)
if updated_service.Port != 6503 {
t.Errorf("Expected port 6503, but got %v", updated_service.Port)
}
if updated_service.PortalIP != "1.2.3.1" { // unchanged, despite trying
t.Errorf("Unexpected PortalIP: %s", updated_service.PortalIP)
}
if updated_service.ProxyPort != 0 { // unchanged, despite trying
t.Errorf("Unexpected ProxyPort: %d", updated_service.ProxyPort)
}
}
func TestServiceRegistryIPExternalLoadBalancer(t *testing.T) {
registry := registrytest.NewServiceRegistry()
fakeCloud := &cloud.FakeCloud{}
machines := []string{"foo", "bar", "baz"}
rest := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t))
svc := &api.Service{
Port: 6502,
TypeMeta: api.TypeMeta{ID: "foo"},
Selector: map[string]string{"bar": "baz"},
CreateExternalLoadBalancer: true,
}
ctx := api.NewDefaultContext()
c, _ := rest.Create(ctx, svc)
created_svc := <-c
created_service := created_svc.(*api.Service)
if created_service.Port != 6502 {
t.Errorf("Expected port 6502, but got %v", created_service.Port)
}
if created_service.PortalIP != "1.2.3.1" {
t.Errorf("Unexpected PortalIP: %s", created_service.PortalIP)
}
if created_service.ProxyPort != 6502 {
t.Errorf("Unexpected ProxyPort: %d", created_service.ProxyPort)
}
}
func TestServiceRegistryIPReloadFromStorage(t *testing.T) {
registry := registrytest.NewServiceRegistry()
fakeCloud := &cloud.FakeCloud{}
machines := []string{"foo", "bar", "baz"}
rest1 := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t))
svc := &api.Service{
Port: 6502,
TypeMeta: api.TypeMeta{ID: "foo"},
Selector: map[string]string{"bar": "baz"},
}
ctx := api.NewDefaultContext()
c, _ := rest1.Create(ctx, svc)
<-c
svc = &api.Service{
Port: 6502,
TypeMeta: api.TypeMeta{ID: "foo"},
Selector: map[string]string{"bar": "baz"},
}
c, _ = rest1.Create(ctx, svc)
<-c
// This will reload from storage, finding the previous 2
rest2 := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t))
svc = &api.Service{
Port: 6502,
TypeMeta: api.TypeMeta{ID: "foo"},
Selector: map[string]string{"bar": "baz"},
}
c, _ = rest2.Create(ctx, svc)
created_svc := <-c
created_service := created_svc.(*api.Service)
if created_service.PortalIP != "1.2.3.3" {
t.Errorf("Unexpected PortalIP: %s", created_service.PortalIP)
}
}