Merge pull request #1584 from thockin/net

Flag-compatible IP type
pull/6/head
Brendan Burns 2014-10-06 13:35:48 -07:00
commit ab6065944c
10 changed files with 74 additions and 28 deletions

View File

@ -41,7 +41,7 @@ import (
var (
port = flag.Uint("port", 8080, "The port to listen on. Default 8080")
address = flag.String("address", "127.0.0.1", "The address on the local server to listen to. Default 127.0.0.1")
address = util.IP(net.ParseIP("127.0.0.1"))
apiPrefix = flag.String("api_prefix", "/api", "The prefix for API requests on the server. Default '/api'")
storageVersion = flag.String("storage_version", "", "The version to store resources with. Defaults to server preferred")
cloudProvider = flag.String("cloud_provider", "", "The provider for cloud services. Empty string for no provider.")
@ -60,6 +60,7 @@ var (
)
func init() {
flag.Var(&address, "address", "The IP address on to serve on (set to 0.0.0.0 for all interfaces)")
flag.Var(&etcdServerList, "etcd_servers", "List of etcd servers to watch (http://ip:port), comma separated")
flag.Var(&machineList, "machines", "List of machines to schedule onto, comma separated.")
flag.Var(&corsAllowedOriginList, "cors_allowed_origins", "List of allowed origins for CORS, comma separated. An allowed origin can be a regular expression to support subdomain matching. If this list is empty CORS will not be enabled.")
@ -133,7 +134,7 @@ func main() {
// TODO: expose same flags as client.BindClientConfigFlags but for a server
clientConfig := &client.Config{
Host: net.JoinHostPort(*address, strconv.Itoa(int(*port))),
Host: net.JoinHostPort(address.String(), strconv.Itoa(int(*port))),
Version: *storageVersion,
}
client, err := client.New(clientConfig)
@ -179,7 +180,7 @@ func main() {
handler = apiserver.RecoverPanics(handler)
s := &http.Server{
Addr: net.JoinHostPort(*address, strconv.Itoa(int(*port))),
Addr: net.JoinHostPort(address.String(), strconv.Itoa(int(*port))),
Handler: handler,
ReadTimeout: 5 * time.Minute,
WriteTimeout: 5 * time.Minute,

View File

@ -39,11 +39,12 @@ import (
var (
port = flag.Int("port", masterPkg.ControllerManagerPort, "The port that the controller-manager's http service runs on")
address = flag.String("address", "127.0.0.1", "The address to serve from")
address = util.IP(net.ParseIP("127.0.0.1"))
clientConfig = &client.Config{}
)
func init() {
flag.Var(&address, "address", "The IP address to serve on (set to 0.0.0.0 for all interfaces)")
client.BindClientConfigFlags(flag.CommandLine, clientConfig)
}
@ -63,7 +64,7 @@ func main() {
glog.Fatalf("Invalid API configuration: %v", err)
}
go http.ListenAndServe(net.JoinHostPort(*address, strconv.Itoa(*port)), nil)
go http.ListenAndServe(net.JoinHostPort(address.String(), strconv.Itoa(*port)), nil)
endpoints := service.NewEndpointController(kubeClient)
go util.Forever(func() { endpoints.SyncServiceEndpoints() }, time.Second*10)

View File

@ -21,6 +21,7 @@ package main
import (
"encoding/json"
"io/ioutil"
"net"
"net/http"
"net/http/httptest"
"os"
@ -154,7 +155,7 @@ func startComponents(manifestURL string) (apiServerURL string) {
myKubelet := kubelet.NewIntegrationTestKubelet(machineList[0], testRootDir, &fakeDocker1)
go util.Forever(func() { myKubelet.Run(cfg1.Updates()) }, 0)
go util.Forever(func() {
kubelet.ListenAndServeKubeletServer(myKubelet, cfg1.Channel("http"), "localhost", 10250)
kubelet.ListenAndServeKubeletServer(myKubelet, cfg1.Channel("http"), net.ParseIP("127.0.0.1"), 10250)
}, 0)
// Kubelet (machine)
@ -165,7 +166,7 @@ func startComponents(manifestURL string) (apiServerURL string) {
otherKubelet := kubelet.NewIntegrationTestKubelet(machineList[1], testRootDir, &fakeDocker2)
go util.Forever(func() { otherKubelet.Run(cfg2.Updates()) }, 0)
go util.Forever(func() {
kubelet.ListenAndServeKubeletServer(otherKubelet, cfg2.Channel("http"), "localhost", 10251)
kubelet.ListenAndServeKubeletServer(otherKubelet, cfg2.Channel("http"), net.ParseIP("127.0.0.1"), 10251)
}, 0)
return apiServer.URL

View File

@ -23,6 +23,7 @@ package main
import (
"flag"
"math/rand"
"net"
"net/http"
"os"
"os/exec"
@ -54,7 +55,7 @@ var (
httpCheckFrequency = flag.Duration("http_check_frequency", 20*time.Second, "Duration between checking http for new data")
manifestURL = flag.String("manifest_url", "", "URL for accessing the container manifest")
enableServer = flag.Bool("enable_server", true, "Enable the info server")
address = flag.String("address", "127.0.0.1", "The address for the info server to serve on (set to 0.0.0.0 or \"\" for all interfaces)")
address = util.IP(net.ParseIP("127.0.0.1"))
port = flag.Uint("port", master.KubeletPort, "The port for the info server to serve on")
hostnameOverride = flag.String("hostname_override", "", "If non-empty, will use this string as identification instead of the actual hostname.")
networkContainerImage = flag.String("network_container_image", kubelet.NetworkContainerImage, "The image that network containers in each pod will use.")
@ -69,6 +70,7 @@ var (
func init() {
flag.Var(&etcdServerList, "etcd_servers", "List of etcd servers to watch (http://ip:port), comma separated")
flag.Var(&address, "address", "The IP address for the info server to serve on (set to 0.0.0.0 for all interfaces)")
}
func getDockerEndpoint() string {
@ -199,7 +201,7 @@ func main() {
// start the kubelet server
if *enableServer {
go util.Forever(func() {
kubelet.ListenAndServeKubeletServer(k, cfg.Channel("http"), *address, *port)
kubelet.ListenAndServeKubeletServer(k, cfg.Channel("http"), net.IP(address), *port)
}, 0)
}

View File

@ -18,6 +18,7 @@ package main
import (
"flag"
"net"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
@ -32,13 +33,14 @@ import (
var (
configFile = flag.String("configfile", "/tmp/proxy_config", "Configuration file for the proxy")
etcdServerList util.StringList
bindAddress = flag.String("bindaddress", "0.0.0.0", "The address for the proxy server to serve on (set to 0.0.0.0 or \"\" for all interfaces)")
bindAddress = util.IP(net.ParseIP("0.0.0.0"))
clientConfig = &client.Config{}
)
func init() {
client.BindClientConfigFlags(flag.CommandLine, clientConfig)
flag.Var(&etcdServerList, "etcd_servers", "List of etcd servers to watch (http://ip:port), comma separated (optional)")
flag.Var(&bindAddress, "bind_address", "The address for the proxy server to serve on (set to 0.0.0.0 for all interfaces)")
}
func main() {
@ -85,7 +87,7 @@ func main() {
glog.Infof("Using configuration file %s", *configFile)
loadBalancer := proxy.NewLoadBalancerRR()
proxier := proxy.NewProxier(loadBalancer, *bindAddress)
proxier := proxy.NewProxier(loadBalancer, net.IP(bindAddress))
// Wire proxier to handle changes to services
serviceConfig.RegisterHandler(proxier)
// And wire loadBalancer to handle changes to endpoints to services

View File

@ -47,11 +47,11 @@ type Server struct {
}
// ListenAndServeKubeletServer initializes a server to respond to HTTP network requests on the Kubelet.
func ListenAndServeKubeletServer(host HostInterface, updates chan<- interface{}, address string, port uint) {
func ListenAndServeKubeletServer(host HostInterface, updates chan<- interface{}, address net.IP, port uint) {
glog.V(1).Infof("Starting to listen on %s:%d", address, port)
handler := NewServer(host, updates)
s := &http.Server{
Addr: net.JoinHostPort(address, strconv.FormatUint(uint64(port), 10)),
Addr: net.JoinHostPort(address.String(), strconv.FormatUint(uint64(port), 10)),
Handler: &handler,
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,

View File

@ -276,7 +276,8 @@ func logTimeout(err error) bool {
return false
}
func newProxySocket(protocol api.Protocol, host string, port int) (proxySocket, error) {
func newProxySocket(protocol api.Protocol, ip net.IP, port int) (proxySocket, error) {
host := ip.String()
switch strings.ToUpper(string(protocol)) {
case "TCP":
listener, err := net.Listen("tcp", net.JoinHostPort(host, strconv.Itoa(port)))
@ -304,12 +305,12 @@ type Proxier struct {
loadBalancer LoadBalancer
mu sync.Mutex // protects serviceMap
serviceMap map[string]*serviceInfo
address string
address net.IP
}
// NewProxier returns a new Proxier given a LoadBalancer and an
// address on which to listen
func NewProxier(loadBalancer LoadBalancer, address string) *Proxier {
func NewProxier(loadBalancer LoadBalancer, address net.IP) *Proxier {
return &Proxier{
loadBalancer: loadBalancer,
serviceMap: make(map[string]*serviceInfo),

View File

@ -144,7 +144,7 @@ func TestTCPProxy(t *testing.T) {
},
})
p := NewProxier(lb, "127.0.0.1")
p := NewProxier(lb, net.ParseIP("127.0.0.1"))
proxyPort, err := p.addServiceOnUnusedPort("echo", "TCP", 0)
if err != nil {
@ -162,7 +162,7 @@ func TestUDPProxy(t *testing.T) {
},
})
p := NewProxier(lb, "127.0.0.1")
p := NewProxier(lb, net.ParseIP("127.0.0.1"))
proxyPort, err := p.addServiceOnUnusedPort("echo", "UDP", time.Second)
if err != nil {
@ -189,7 +189,7 @@ func TestTCPProxyStop(t *testing.T) {
},
})
p := NewProxier(lb, "127.0.0.1")
p := NewProxier(lb, net.ParseIP("127.0.0.1"))
proxyPort, err := p.addServiceOnUnusedPort("echo", "TCP", 0)
if err != nil {
@ -217,7 +217,7 @@ func TestUDPProxyStop(t *testing.T) {
},
})
p := NewProxier(lb, "127.0.0.1")
p := NewProxier(lb, net.ParseIP("127.0.0.1"))
proxyPort, err := p.addServiceOnUnusedPort("echo", "UDP", time.Second)
if err != nil {
@ -245,7 +245,7 @@ func TestTCPProxyUpdateDelete(t *testing.T) {
},
})
p := NewProxier(lb, "127.0.0.1")
p := NewProxier(lb, net.ParseIP("127.0.0.1"))
proxyPort, err := p.addServiceOnUnusedPort("echo", "TCP", 0)
if err != nil {
@ -272,7 +272,7 @@ func TestUDPProxyUpdateDelete(t *testing.T) {
},
})
p := NewProxier(lb, "127.0.0.1")
p := NewProxier(lb, net.ParseIP("127.0.0.1"))
proxyPort, err := p.addServiceOnUnusedPort("echo", "UDP", time.Second)
if err != nil {
@ -299,7 +299,7 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
},
})
p := NewProxier(lb, "127.0.0.1")
p := NewProxier(lb, net.ParseIP("127.0.0.1"))
proxyPort, err := p.addServiceOnUnusedPort("echo", "TCP", 0)
if err != nil {
@ -331,7 +331,7 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
},
})
p := NewProxier(lb, "127.0.0.1")
p := NewProxier(lb, net.ParseIP("127.0.0.1"))
proxyPort, err := p.addServiceOnUnusedPort("echo", "UDP", time.Second)
if err != nil {
@ -363,7 +363,7 @@ func TestTCPProxyUpdatePort(t *testing.T) {
},
})
p := NewProxier(lb, "127.0.0.1")
p := NewProxier(lb, net.ParseIP("127.0.0.1"))
proxyPort, err := p.addServiceOnUnusedPort("echo", "TCP", 0)
if err != nil {
@ -408,7 +408,7 @@ func TestUDPProxyUpdatePort(t *testing.T) {
},
})
p := NewProxier(lb, "127.0.0.1")
p := NewProxier(lb, net.ParseIP("127.0.0.1"))
proxyPort, err := p.addServiceOnUnusedPort("echo", "UDP", time.Second)
if err != nil {

37
pkg/util/net.go Normal file
View File

@ -0,0 +1,37 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"fmt"
"net"
)
// IP adapts net.IP for use as a flag.
type IP net.IP
func (ip IP) String() string {
return net.IP(ip).String()
}
func (ip *IP) Set(value string) error {
*ip = IP(net.ParseIP(value))
if *ip == nil {
return fmt.Errorf("invalid IP address: '%s'", value)
}
return nil
}

View File

@ -34,11 +34,12 @@ import (
var (
port = flag.Int("port", masterPkg.SchedulerPort, "The port that the scheduler's http service runs on")
address = flag.String("address", "127.0.0.1", "The address to serve from")
address = util.IP(net.ParseIP("127.0.0.1"))
clientConfig = &client.Config{}
)
func init() {
flag.Var(&address, "address", "The IP address to serve on (set to 0.0.0.0 for all interfaces)")
client.BindClientConfigFlags(flag.CommandLine, clientConfig)
}
@ -54,7 +55,7 @@ func main() {
glog.Fatalf("Invalid API configuration: %v", err)
}
go http.ListenAndServe(net.JoinHostPort(*address, strconv.Itoa(*port)), nil)
go http.ListenAndServe(net.JoinHostPort(address.String(), strconv.Itoa(*port)), nil)
configFactory := &factory.ConfigFactory{Client: kubeClient}
config := configFactory.Create()