Use KubeletPort reporeted in NodeStatus instead of cluster-wide master config.

pull/6/head
gmarek 2015-08-13 16:05:32 +02:00
parent e9dc7306cc
commit 6d6cd8e46a
14 changed files with 193 additions and 46 deletions

View File

@ -46,6 +46,7 @@ CLUSTER_IP_RANGE="${CLUSTER_IP_RANGE:-10.244.0.0/16}"
MINION_SCOPES="${MINION_SCOPES:-compute-rw,monitoring,logging-write,storage-ro}"
RUNTIME_CONFIG="${KUBE_RUNTIME_CONFIG:-}"
ENABLE_EXPERIMENTAL_API="${KUBE_ENABLE_EXPERIMENTAL_API:-false}"
KUBELET_PORT="${KUBELET_PORT:-10250}"
# Increase the sleep interval value if concerned about API rate limits. 3, in seconds, is the default.
POLL_SLEEP_INTERVAL="${POLL_SLEEP_INTERVAL:-3}"

View File

@ -48,6 +48,7 @@ MINION_SCOPES="${MINION_SCOPES:-compute-rw,monitoring,logging-write,storage-ro}"
RUNTIME_CONFIG="${KUBE_RUNTIME_CONFIG:-}"
ENABLE_EXPERIMENTAL_API="${KUBE_ENABLE_EXPERIMENTAL_API:-false}"
TERMINATED_POD_GC_THRESHOLD=${TERMINATED_POD_GC_THRESHOLD:-100}
KUBELET_PORT="${KUBELET_PORT:-10250}"
# Increase the sleep interval value if concerned about API rate limits. 3, in seconds, is the default.
POLL_SLEEP_INTERVAL=3

View File

@ -284,6 +284,7 @@ opencontrail_public_subnet: '$(echo "$OPENCONTRAIL_PUBLIC_SUBNET")'
enable_manifest_url: '$(echo "$ENABLE_MANIFEST_URL" | sed -e "s/'/''/g")'
manifest_url: '$(echo "$MANIFEST_URL" | sed -e "s/'/''/g")'
manifest_url_header: '$(echo "$MANIFEST_URL_HEADER" | sed -e "s/'/''/g")'
kubelet_port: '$(echo "$KUBELET_PORT")'
EOF
if [ -n "${APISERVER_TEST_ARGS:-}" ]; then

View File

@ -60,6 +60,7 @@ NETWORK_PROVIDER: $(yaml-quote ${NETWORK_PROVIDER:-})
OPENCONTRAIL_TAG: $(yaml-quote ${OPENCONTRAIL_TAG:-})
OPENCONTRAIL_KUBERNETES_TAG: $(yaml-quote ${OPENCONTRAIL_KUBERNETES_TAG:-})
OPENCONTRAIL_PUBLIC_SUBNET: $(yaml-quote ${OPENCONTRAIL_PUBLIC_SUBNET:-})
KUBELET_PORT: $(yaml-quote ${KUBELET_PORT})
EOF
if [ -n "${KUBE_APISERVER_REQUEST_TIMEOUT:-}" ]; then
cat >>$file <<EOF

View File

@ -116,5 +116,10 @@
{% set network_plugin = "--network-plugin=opencontrail" %}
{% endif -%}
{% set kubelet_port = "--port=10250" -%}
{% if pillar['kubelet_port'] is defined -%}
{% set kubelet_port="--port=" + pillar['kubelet_port'] %}
{% endif -%}
# test_args has to be kept at the end, so they'll overwrite any prior configuration
DAEMON_ARGS="{{daemon_args}} {{api_servers_with_port}} {{debugging_handlers}} {{hostname_override}} {{cloud_provider}} {{config}} {{manifest_url}} --allow-privileged={{pillar['allow_privileged']}} {{pillar['log_level']}} {{cluster_dns}} {{cluster_domain}} {{docker_root}} {{kubelet_root}} {{configure_cbr0}} {{cgroup_root}} {{system_container}} {{pod_cidr}} {{ master_kubelet_args }} {{cpu_cfs_quota}} {{network_plugin}} {{test_args}}"
DAEMON_ARGS="{{daemon_args}} {{api_servers_with_port}} {{debugging_handlers}} {{hostname_override}} {{cloud_provider}} {{config}} {{manifest_url}} --allow-privileged={{pillar['allow_privileged']}} {{pillar['log_level']}} {{cluster_dns}} {{cluster_domain}} {{docker_root}} {{kubelet_root}} {{configure_cbr0}} {{cgroup_root}} {{system_container}} {{pod_cidr}} {{ master_kubelet_args }} {{cpu_cfs_quota}} {{network_plugin}} {{kubelet_port}} {{test_args}}"

View File

@ -240,6 +240,7 @@ func (s *APIServer) AddFlags(fs *pflag.FlagSet) {
// Kubelet related flags:
fs.BoolVar(&s.KubeletConfig.EnableHttps, "kubelet-https", s.KubeletConfig.EnableHttps, "Use https for kubelet connections")
fs.UintVar(&s.KubeletConfig.Port, "kubelet-port", s.KubeletConfig.Port, "Kubelet port")
fs.MarkDeprecated("kubelet-port", "kubelet-port is deprecated and will be removed")
fs.DurationVar(&s.KubeletConfig.HTTPTimeout, "kubelet-timeout", s.KubeletConfig.HTTPTimeout, "Timeout for kubelet operations")
fs.StringVar(&s.KubeletConfig.CertFile, "kubelet-client-certificate", s.KubeletConfig.CertFile, "Path to a client cert file for TLS.")
fs.StringVar(&s.KubeletConfig.KeyFile, "kubelet-client-key", s.KubeletConfig.KeyFile, "Path to a client key file for TLS.")

View File

@ -40,8 +40,7 @@ func TestHTTPKubeletClient(t *testing.T) {
testServer := httptest.NewServer(&fakeHandler)
defer testServer.Close()
_, err = url.Parse(testServer.URL)
if err != nil {
if _, err := url.Parse(testServer.URL); err != nil {
t.Errorf("unexpected error: %v", err)
}
}

View File

@ -538,7 +538,6 @@ func (m *Master) init(c *Config) {
healthzChecks := []healthz.HealthzChecker{}
dbClient := func(resource string) storage.Interface { return c.StorageDestinations.get("", resource) }
podStorage := podetcd.NewStorage(dbClient("pods"), c.EnableWatchCache, c.KubeletClient, m.proxyTransport)
podTemplateStorage := podtemplateetcd.NewREST(dbClient("podTemplates"))
@ -560,6 +559,14 @@ func (m *Master) init(c *Config) {
nodeStorage, nodeStatusStorage := nodeetcd.NewREST(dbClient("nodes"), c.EnableWatchCache, c.KubeletClient, m.proxyTransport)
m.nodeRegistry = node.NewRegistry(nodeStorage)
podStorage := podetcd.NewStorage(
dbClient("pods"),
nodeStorage,
c.EnableWatchCache,
c.KubeletClient,
m.proxyTransport,
)
serviceStorage := serviceetcd.NewREST(dbClient("services"))
m.serviceRegistry = service.NewRegistry(serviceStorage)

View File

@ -17,6 +17,7 @@ limitations under the License.
package etcd
import (
"fmt"
"net/http"
"net/url"
@ -102,3 +103,25 @@ var _ = rest.Redirector(&REST{})
func (r *REST) ResourceLocation(ctx api.Context, id string) (*url.URL, http.RoundTripper, error) {
return node.ResourceLocation(r, r.connection, r.proxyTransport, ctx, id)
}
// An interface for types that allow getting information about a Node on which give pod is running.
type HostLocator interface {
HostKubeletPort(pod *api.Pod, ctx api.Context) (int, error)
}
var _ = HostLocator(&REST{})
func (r *REST) HostKubeletPort(pod *api.Pod, ctx api.Context) (int, error) {
obj, err := r.Get(ctx, pod.Spec.NodeName)
if err != nil {
return 0, err
}
node := obj.(*api.Node)
if node == nil {
return 0, fmt.Errorf("Unexpected object type: %#v", node)
}
if node.Status.DaemonEndpoints.KubeletEndpoint.Port == 0 {
return -1, nil
}
return node.Status.DaemonEndpoints.KubeletEndpoint.Port, nil
}

View File

@ -153,17 +153,30 @@ func ResourceLocation(getter ResourceGetter, connection client.ConnectionInfoGet
}
host := hostIP.String()
if portReq == "" || strconv.Itoa(ports.KubeletPort) == portReq {
// Ignore requested scheme, use scheme provided by GetConnectionInfo
// We check if we want to get a default Kubelet's transport. It happens if either:
// - no port is specified in request (Kubelet's port is default),
// - we're using Port stored as a DaemonEndpoint and requested port is a Kubelet's port stored in the DaemonEndpoint,
// - there's no information in the API about DaemonEnpoint (legacy cluster) and requested port is equal to ports.KubeletPort (cluster-wide config)
defaultKubeletPort := node.Status.DaemonEndpoints.KubeletEndpoint.Port
if defaultKubeletPort == 0 {
defaultKubeletPort = ports.KubeletPort
}
if portReq == "" || strconv.Itoa(defaultKubeletPort) == portReq {
scheme, port, kubeletTransport, err := connection.GetConnectionInfo(host)
if err != nil {
return nil, nil, err
}
var portString string
if node.Status.DaemonEndpoints.KubeletEndpoint.Port != 0 {
portString = strconv.Itoa(node.Status.DaemonEndpoints.KubeletEndpoint.Port)
} else {
portString = strconv.FormatUint(uint64(port), 10)
}
return &url.URL{
Scheme: scheme,
Host: net.JoinHostPort(
host,
strconv.FormatUint(uint64(port), 10),
portString,
),
},
kubeletTransport,

View File

@ -59,8 +59,19 @@ type REST struct {
proxyTransport http.RoundTripper
}
// Defined in pkg/registry/node/etcd/etcd.go
type HostLocator interface {
HostKubeletPort(pod *api.Pod, ctx api.Context) (int, error)
}
// NewStorage returns a RESTStorage object that will work against pods.
func NewStorage(s storage.Interface, useCacher bool, k client.ConnectionInfoGetter, proxyTransport http.RoundTripper) PodStorage {
func NewStorage(
s storage.Interface,
hostLocator HostLocator,
useCacher bool,
k client.ConnectionInfoGetter,
proxyTransport http.RoundTripper,
) PodStorage {
prefix := "/pods"
storageInterface := s
@ -110,11 +121,11 @@ func NewStorage(s storage.Interface, useCacher bool, k client.ConnectionInfoGett
Pod: &REST{store, proxyTransport},
Binding: &BindingREST{store: store},
Status: &StatusREST{store: &statusStore},
Log: &podrest.LogREST{Store: store, KubeletConn: k},
Log: &podrest.LogREST{Store: store, HostLocator: hostLocator, KubeletConn: k},
Proxy: &ProxyREST{store: store, proxyTransport: proxyTransport},
Exec: &ExecREST{store: store, kubeletConn: k},
Attach: &AttachREST{store: store, kubeletConn: k},
PortForward: &PortForwardREST{store: store, kubeletConn: k},
Exec: &ExecREST{store: store, hostLocator: hostLocator, kubeletConn: k},
Attach: &AttachREST{store: store, hostLocator: hostLocator, kubeletConn: k},
PortForward: &PortForwardREST{store: store, hostLocator: hostLocator, kubeletConn: k},
}
}
@ -262,6 +273,7 @@ var upgradeableMethods = []string{"GET", "POST"}
type AttachREST struct {
store *etcdgeneric.Etcd
kubeletConn client.ConnectionInfoGetter
hostLocator HostLocator
}
// Implement Connecter
@ -278,10 +290,14 @@ func (r *AttachREST) Connect(ctx api.Context, name string, opts runtime.Object,
if !ok {
return nil, fmt.Errorf("Invalid options object: %#v", opts)
}
location, transport, err := pod.AttachLocation(r.store, r.kubeletConn, ctx, name, attachOpts)
location, transport, err := pod.AttachLocation(r.store, r.kubeletConn, ctx, name, attachOpts, r.hostLocator)
if err != nil {
return nil, err
}
if location.Host == "" {
return nil, fmt.Errorf("Empty location.Host in %#v", location)
}
return newThrottledUpgradeAwareProxyHandler(location, transport, false, true, responder), nil
}
@ -300,6 +316,7 @@ func (r *AttachREST) ConnectMethods() []string {
type ExecREST struct {
store *etcdgeneric.Etcd
kubeletConn client.ConnectionInfoGetter
hostLocator HostLocator
}
// Implement Connecter
@ -316,10 +333,14 @@ func (r *ExecREST) Connect(ctx api.Context, name string, opts runtime.Object, re
if !ok {
return nil, fmt.Errorf("invalid options object: %#v", opts)
}
location, transport, err := pod.ExecLocation(r.store, r.kubeletConn, ctx, name, execOpts)
location, transport, err := pod.ExecLocation(r.store, r.kubeletConn, ctx, name, execOpts, r.hostLocator)
if err != nil {
return nil, err
}
if location.Host == "" {
return nil, fmt.Errorf("Empty location.Host in %#v", location)
}
return newThrottledUpgradeAwareProxyHandler(location, transport, false, true, responder), nil
}
@ -337,6 +358,7 @@ func (r *ExecREST) ConnectMethods() []string {
// TODO: move me into pod/rest - I'm generic to store type via ResourceGetter
type PortForwardREST struct {
store *etcdgeneric.Etcd
hostLocator HostLocator
kubeletConn client.ConnectionInfoGetter
}
@ -360,10 +382,14 @@ func (r *PortForwardREST) ConnectMethods() []string {
// Connect returns a handler for the pod portforward proxy
func (r *PortForwardREST) Connect(ctx api.Context, name string, opts runtime.Object, responder rest.Responder) (http.Handler, error) {
location, transport, err := pod.PortForwardLocation(r.store, r.kubeletConn, ctx, name)
location, transport, err := pod.PortForwardLocation(r.store, r.kubeletConn, ctx, name, r.hostLocator)
if err != nil {
return nil, err
}
if location.Host == "" {
return nil, fmt.Errorf("Empty location.Host in %#v", location)
}
return newThrottledUpgradeAwareProxyHandler(location, transport, false, true, responder), nil
}

View File

@ -28,6 +28,7 @@ import (
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
nodeetcd "k8s.io/kubernetes/pkg/registry/node/etcd"
"k8s.io/kubernetes/pkg/registry/registrytest"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/securitycontext"
@ -36,10 +37,11 @@ import (
"k8s.io/kubernetes/pkg/util"
)
func newStorage(t *testing.T) (*REST, *BindingREST, *StatusREST, *tools.FakeEtcdClient) {
func newStorage(t *testing.T) (*REST, *BindingREST, *StatusREST, *tools.FakeEtcdClient, PodStorage) {
etcdStorage, fakeClient := registrytest.NewEtcdStorage(t, "")
storage := NewStorage(etcdStorage, false, nil, nil)
return storage.Pod, storage.Binding, storage.Status, fakeClient
nodeREST, _ := nodeetcd.NewREST(etcdStorage, false, nil, nil)
storage := NewStorage(etcdStorage, nodeREST, false, nil, nil)
return storage.Pod, storage.Binding, storage.Status, fakeClient, storage
}
func validNewPod() *api.Pod {
@ -79,7 +81,7 @@ func validChangedPod() *api.Pod {
}
func TestCreate(t *testing.T) {
storage, _, _, fakeClient := newStorage(t)
storage, _, _, fakeClient, _ := newStorage(t)
test := registrytest.New(t, fakeClient, storage.Etcd)
pod := validNewPod()
pod.ObjectMeta = api.ObjectMeta{}
@ -104,7 +106,7 @@ func TestCreate(t *testing.T) {
}
func TestUpdate(t *testing.T) {
storage, _, _, fakeClient := newStorage(t)
storage, _, _, fakeClient, _ := newStorage(t)
test := registrytest.New(t, fakeClient, storage.Etcd)
test.TestUpdate(
// valid
@ -119,7 +121,7 @@ func TestUpdate(t *testing.T) {
}
func TestDelete(t *testing.T) {
storage, _, _, fakeClient := newStorage(t)
storage, _, _, fakeClient, _ := newStorage(t)
test := registrytest.New(t, fakeClient, storage.Etcd).ReturnDeletedObject()
test.TestDelete(validNewPod())
@ -129,7 +131,7 @@ func TestDelete(t *testing.T) {
}
func TestCreateRegistryError(t *testing.T) {
storage, _, _, fakeClient := newStorage(t)
storage, _, _, fakeClient, _ := newStorage(t)
fakeClient.Err = fmt.Errorf("test error")
pod := validNewPod()
@ -140,7 +142,7 @@ func TestCreateRegistryError(t *testing.T) {
}
func TestCreateSetsFields(t *testing.T) {
storage, _, _, fakeClient := newStorage(t)
storage, _, _, fakeClient, _ := newStorage(t)
pod := validNewPod()
_, err := storage.Create(api.NewDefaultContext(), pod)
if err != fakeClient.Err {
@ -254,7 +256,7 @@ func TestResourceLocation(t *testing.T) {
ctx := api.NewDefaultContext()
for _, tc := range testCases {
storage, _, _, fakeClient := newStorage(t)
storage, _, _, fakeClient, _ := newStorage(t)
key, _ := storage.KeyFunc(ctx, tc.pod.Name)
key = etcdtest.AddPrefix(key)
if _, err := fakeClient.Set(key, runtime.EncodeOrDie(testapi.Default.Codec(), &tc.pod), 0); err != nil {
@ -280,19 +282,19 @@ func TestResourceLocation(t *testing.T) {
}
func TestGet(t *testing.T) {
storage, _, _, fakeClient := newStorage(t)
storage, _, _, fakeClient, _ := newStorage(t)
test := registrytest.New(t, fakeClient, storage.Etcd)
test.TestGet(validNewPod())
}
func TestList(t *testing.T) {
storage, _, _, fakeClient := newStorage(t)
storage, _, _, fakeClient, _ := newStorage(t)
test := registrytest.New(t, fakeClient, storage.Etcd)
test.TestList(validNewPod())
}
func TestWatch(t *testing.T) {
storage, _, _, fakeClient := newStorage(t)
storage, _, _, fakeClient, _ := newStorage(t)
test := registrytest.New(t, fakeClient, storage.Etcd)
test.TestWatch(
validNewPod(),
@ -314,7 +316,7 @@ func TestWatch(t *testing.T) {
}
func TestEtcdCreate(t *testing.T) {
storage, bindingStorage, _, fakeClient := newStorage(t)
storage, bindingStorage, _, fakeClient, _ := newStorage(t)
ctx := api.NewDefaultContext()
fakeClient.TestIndex = true
key, _ := storage.KeyFunc(ctx, "foo")
@ -352,7 +354,7 @@ func TestEtcdCreate(t *testing.T) {
// Ensure that when scheduler creates a binding for a pod that has already been deleted
// by the API server, API server returns not-found error.
func TestEtcdCreateBindingNoPod(t *testing.T) {
storage, bindingStorage, _, fakeClient := newStorage(t)
storage, bindingStorage, _, fakeClient, _ := newStorage(t)
ctx := api.NewDefaultContext()
fakeClient.TestIndex = true
@ -384,7 +386,7 @@ func TestEtcdCreateBindingNoPod(t *testing.T) {
}
func TestEtcdCreateFailsWithoutNamespace(t *testing.T) {
storage, _, _, fakeClient := newStorage(t)
storage, _, _, fakeClient, _ := newStorage(t)
fakeClient.TestIndex = true
pod := validNewPod()
pod.Namespace = ""
@ -396,7 +398,7 @@ func TestEtcdCreateFailsWithoutNamespace(t *testing.T) {
}
func TestEtcdCreateWithContainersNotFound(t *testing.T) {
storage, bindingStorage, _, fakeClient := newStorage(t)
storage, bindingStorage, _, fakeClient, _ := newStorage(t)
ctx := api.NewDefaultContext()
fakeClient.TestIndex = true
key, _ := storage.KeyFunc(ctx, "foo")
@ -439,7 +441,7 @@ func TestEtcdCreateWithContainersNotFound(t *testing.T) {
}
func TestEtcdCreateWithConflict(t *testing.T) {
storage, bindingStorage, _, fakeClient := newStorage(t)
storage, bindingStorage, _, fakeClient, _ := newStorage(t)
ctx := api.NewDefaultContext()
fakeClient.TestIndex = true
key, _ := storage.KeyFunc(ctx, "foo")
@ -471,7 +473,7 @@ func TestEtcdCreateWithConflict(t *testing.T) {
}
func TestEtcdCreateWithExistingContainers(t *testing.T) {
storage, bindingStorage, _, fakeClient := newStorage(t)
storage, bindingStorage, _, fakeClient, _ := newStorage(t)
ctx := api.NewDefaultContext()
fakeClient.TestIndex = true
key, _ := storage.KeyFunc(ctx, "foo")
@ -550,7 +552,7 @@ func TestEtcdCreateBinding(t *testing.T) {
},
}
for k, test := range testCases {
storage, bindingStorage, _, fakeClient := newStorage(t)
storage, bindingStorage, _, fakeClient, _ := newStorage(t)
key, _ := storage.KeyFunc(ctx, "foo")
key = etcdtest.AddPrefix(key)
fakeClient.ExpectNotFoundGet(key)
@ -573,7 +575,7 @@ func TestEtcdCreateBinding(t *testing.T) {
}
func TestEtcdUpdateNotScheduled(t *testing.T) {
storage, _, _, fakeClient := newStorage(t)
storage, _, _, fakeClient, _ := newStorage(t)
ctx := api.NewDefaultContext()
fakeClient.TestIndex = true
@ -598,7 +600,7 @@ func TestEtcdUpdateNotScheduled(t *testing.T) {
}
func TestEtcdUpdateScheduled(t *testing.T) {
storage, _, _, fakeClient := newStorage(t)
storage, _, _, fakeClient, _ := newStorage(t)
ctx := api.NewDefaultContext()
fakeClient.TestIndex = true
@ -664,7 +666,7 @@ func TestEtcdUpdateScheduled(t *testing.T) {
}
func TestEtcdUpdateStatus(t *testing.T) {
storage, _, statusStorage, fakeClient := newStorage(t)
storage, _, statusStorage, fakeClient, _ := newStorage(t)
ctx := api.NewDefaultContext()
fakeClient.TestIndex = true

View File

@ -30,11 +30,17 @@ import (
"k8s.io/kubernetes/pkg/runtime"
)
// Defined in pkg/registry/node/etcd/etcd.go
type HostLocator interface {
HostKubeletPort(pod *api.Pod, ctx api.Context) (int, error)
}
// LogREST implements the log endpoint for a Pod
// TODO: move me into pod/rest - I'm generic to store type via ResourceGetter
type LogREST struct {
Store *etcdgeneric.Etcd
HostLocator HostLocator
KubeletConn client.ConnectionInfoGetter
Store *etcdgeneric.Etcd
}
// LogREST implements GetterWithOptions
@ -55,10 +61,13 @@ func (r *LogREST) Get(ctx api.Context, name string, opts runtime.Object) (runtim
if errs := validation.ValidatePodLogOptions(logOpts); len(errs) > 0 {
return nil, errors.NewInvalid("podlogs", name, errs)
}
location, transport, err := pod.LogLocation(r.Store, r.KubeletConn, ctx, name, logOpts)
location, transport, err := pod.LogLocation(r.Store, r.KubeletConn, ctx, name, logOpts, r.HostLocator)
if err != nil {
return nil, err
}
if location.Host == "" {
return nil, fmt.Errorf("Empty location.Host in %#v", location)
}
return &genericrest.LocationStreamer{
Location: location,
Transport: transport,

View File

@ -31,6 +31,7 @@ import (
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/registry/generic"
nodeetcd "k8s.io/kubernetes/pkg/registry/node/etcd"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/fielderrors"
@ -224,7 +225,14 @@ func ResourceLocation(getter ResourceGetter, rt http.RoundTripper, ctx api.Conte
// LogLocation returns the log URL for a pod container. If opts.Container is blank
// and only one container is present in the pod, that container is used.
func LogLocation(getter ResourceGetter, connInfo client.ConnectionInfoGetter, ctx api.Context, name string, opts *api.PodLogOptions) (*url.URL, http.RoundTripper, error) {
func LogLocation(
getter ResourceGetter,
connInfo client.ConnectionInfoGetter,
ctx api.Context,
name string,
opts *api.PodLogOptions,
hostLocator nodeetcd.HostLocator,
) (*url.URL, http.RoundTripper, error) {
pod, err := getPod(getter, ctx, name)
if err != nil {
return nil, nil, err
@ -248,6 +256,13 @@ func LogLocation(getter ResourceGetter, connInfo client.ConnectionInfoGetter, ct
if err != nil {
return nil, nil, err
}
daemonPort, err := hostLocator.HostKubeletPort(pod, ctx)
if err != nil {
return nil, nil, err
}
if daemonPort > 0 {
nodePort = uint(daemonPort)
}
params := url.Values{}
if opts.Follow {
params.Add("follow", "true")
@ -318,17 +333,40 @@ func streamParams(params url.Values, opts runtime.Object) error {
// AttachLocation returns the attach URL for a pod container. If opts.Container is blank
// and only one container is present in the pod, that container is used.
func AttachLocation(getter ResourceGetter, connInfo client.ConnectionInfoGetter, ctx api.Context, name string, opts *api.PodAttachOptions) (*url.URL, http.RoundTripper, error) {
return streamLocation(getter, connInfo, ctx, name, opts, opts.Container, "attach")
func AttachLocation(
getter ResourceGetter,
connInfo client.ConnectionInfoGetter,
ctx api.Context,
name string,
opts *api.PodAttachOptions,
hostLocator nodeetcd.HostLocator,
) (*url.URL, http.RoundTripper, error) {
return streamLocation(getter, connInfo, ctx, name, opts, opts.Container, "attach", hostLocator)
}
// ExecLocation returns the exec URL for a pod container. If opts.Container is blank
// and only one container is present in the pod, that container is used.
func ExecLocation(getter ResourceGetter, connInfo client.ConnectionInfoGetter, ctx api.Context, name string, opts *api.PodExecOptions) (*url.URL, http.RoundTripper, error) {
return streamLocation(getter, connInfo, ctx, name, opts, opts.Container, "exec")
func ExecLocation(
getter ResourceGetter,
connInfo client.ConnectionInfoGetter,
ctx api.Context,
name string,
opts *api.PodExecOptions,
hostLocator nodeetcd.HostLocator,
) (*url.URL, http.RoundTripper, error) {
return streamLocation(getter, connInfo, ctx, name, opts, opts.Container, "exec", hostLocator)
}
func streamLocation(getter ResourceGetter, connInfo client.ConnectionInfoGetter, ctx api.Context, name string, opts runtime.Object, container, path string) (*url.URL, http.RoundTripper, error) {
func streamLocation(
getter ResourceGetter,
connInfo client.ConnectionInfoGetter,
ctx api.Context,
name string,
opts runtime.Object,
container,
path string,
hostLocator nodeetcd.HostLocator,
) (*url.URL, http.RoundTripper, error) {
pod, err := getPod(getter, ctx, name)
if err != nil {
return nil, nil, err
@ -351,6 +389,13 @@ func streamLocation(getter ResourceGetter, connInfo client.ConnectionInfoGetter,
if err != nil {
return nil, nil, err
}
daemonPort, err := hostLocator.HostKubeletPort(pod, ctx)
if err != nil {
return nil, nil, err
}
if daemonPort > 0 {
nodePort = uint(daemonPort)
}
params := url.Values{}
if err := streamParams(params, opts); err != nil {
return nil, nil, err
@ -365,7 +410,13 @@ func streamLocation(getter ResourceGetter, connInfo client.ConnectionInfoGetter,
}
// PortForwardLocation returns the port-forward URL for a pod.
func PortForwardLocation(getter ResourceGetter, connInfo client.ConnectionInfoGetter, ctx api.Context, name string) (*url.URL, http.RoundTripper, error) {
func PortForwardLocation(
getter ResourceGetter,
connInfo client.ConnectionInfoGetter,
ctx api.Context,
name string,
hostLocator nodeetcd.HostLocator,
) (*url.URL, http.RoundTripper, error) {
pod, err := getPod(getter, ctx, name)
if err != nil {
return nil, nil, err
@ -380,6 +431,13 @@ func PortForwardLocation(getter ResourceGetter, connInfo client.ConnectionInfoGe
if err != nil {
return nil, nil, err
}
daemonPort, err := hostLocator.HostKubeletPort(pod, ctx)
if err != nil {
return nil, nil, err
}
if daemonPort > 0 {
nodePort = uint(daemonPort)
}
loc := &url.URL{
Scheme: nodeScheme,
Host: fmt.Sprintf("%s:%d", nodeHost, nodePort),