mirror of https://github.com/k3s-io/k3s
Use KubeletPort reported in NodeStatus instead of cluster-wide master config, take 2.
parent
11c878e17c
commit
459131fd92
|
@ -85,7 +85,7 @@ var (
|
|||
|
||||
type fakeKubeletClient struct{}
|
||||
|
||||
func (fakeKubeletClient) GetConnectionInfo(host string) (string, uint, http.RoundTripper, error) {
|
||||
func (fakeKubeletClient) GetConnectionInfo(ctx api.Context, nodeName string) (string, uint, http.RoundTripper, error) {
|
||||
return "", 0, nil, errors.New("Not Implemented")
|
||||
}
|
||||
|
||||
|
|
|
@ -43,6 +43,7 @@ import (
|
|||
"k8s.io/kubernetes/pkg/capabilities"
|
||||
client "k8s.io/kubernetes/pkg/client/unversioned"
|
||||
"k8s.io/kubernetes/pkg/cloudprovider"
|
||||
kubeletclient "k8s.io/kubernetes/pkg/kubelet/client"
|
||||
"k8s.io/kubernetes/pkg/master"
|
||||
"k8s.io/kubernetes/pkg/master/ports"
|
||||
"k8s.io/kubernetes/pkg/storage"
|
||||
|
@ -107,7 +108,7 @@ type APIServer struct {
|
|||
EnableLogsSupport bool
|
||||
MasterServiceNamespace string
|
||||
RuntimeConfig util.ConfigurationMap
|
||||
KubeletConfig client.KubeletConfig
|
||||
KubeletConfig kubeletclient.KubeletClientConfig
|
||||
ClusterName string
|
||||
EnableProfiling bool
|
||||
EnableWatchCache bool
|
||||
|
@ -140,7 +141,7 @@ func NewAPIServer() *APIServer {
|
|||
StorageVersions: latest.AllPreferredGroupVersions(),
|
||||
|
||||
RuntimeConfig: make(util.ConfigurationMap),
|
||||
KubeletConfig: client.KubeletConfig{
|
||||
KubeletConfig: kubeletclient.KubeletClientConfig{
|
||||
Port: ports.KubeletPort,
|
||||
EnableHttps: true,
|
||||
HTTPTimeout: time.Duration(5) * time.Second,
|
||||
|
@ -259,6 +260,7 @@ func (s *APIServer) AddFlags(fs *pflag.FlagSet) {
|
|||
// Kubelet related flags:
|
||||
fs.BoolVar(&s.KubeletConfig.EnableHttps, "kubelet-https", s.KubeletConfig.EnableHttps, "Use https for kubelet connections")
|
||||
fs.UintVar(&s.KubeletConfig.Port, "kubelet-port", s.KubeletConfig.Port, "Kubelet port")
|
||||
fs.MarkDeprecated("kubelet-port", "kubelet-port is deprecated and will be removed")
|
||||
fs.DurationVar(&s.KubeletConfig.HTTPTimeout, "kubelet-timeout", s.KubeletConfig.HTTPTimeout, "Timeout for kubelet operations")
|
||||
fs.StringVar(&s.KubeletConfig.CertFile, "kubelet-client-certificate", s.KubeletConfig.CertFile, "Path to a client cert file for TLS.")
|
||||
fs.StringVar(&s.KubeletConfig.KeyFile, "kubelet-client-key", s.KubeletConfig.KeyFile, "Path to a client key file for TLS.")
|
||||
|
@ -427,7 +429,7 @@ func (s *APIServer) Run(_ []string) error {
|
|||
// Proxying to pods and services is IP-based... don't expect to be able to verify the hostname
|
||||
proxyTLSClientConfig := &tls.Config{InsecureSkipVerify: true}
|
||||
|
||||
kubeletClient, err := client.NewKubeletClient(&s.KubeletConfig)
|
||||
kubeletClient, err := kubeletclient.NewStaticKubeletClient(&s.KubeletConfig)
|
||||
if err != nil {
|
||||
glog.Fatalf("Failure to start kubelet client: %v", err)
|
||||
}
|
||||
|
|
|
@ -78,7 +78,6 @@ kube-apiserver
|
|||
--kubelet-client-certificate="": Path to a client cert file for TLS.
|
||||
--kubelet-client-key="": Path to a client key file for TLS.
|
||||
--kubelet-https[=true]: Use https for kubelet connections
|
||||
--kubelet-port=10250: Kubelet port
|
||||
--kubelet-timeout=5s: Timeout for kubelet operations
|
||||
--kubernetes-service-node-port=0: If non-zero, the Kubernetes master service (which apiserver creates/maintains) will be of type NodePort, using this as the value of the port. If zero, the Kubernetes master service will be of type ClusterIP.
|
||||
--log-flush-frequency=5s: Maximum number of seconds between log flushes
|
||||
|
@ -108,9 +107,6 @@ kube-apiserver
|
|||
--watch-cache[=true]: Enable watch caching in the apiserver
|
||||
```
|
||||
|
||||
###### Auto generated by spf13/cobra on 30-Oct-2015
|
||||
|
||||
|
||||
<!-- BEGIN MUNGE: GENERATED_ANALYTICS -->
|
||||
[![Analytics](https://kubernetes-site.appspot.com/UA-36037335-10/GitHub/docs/admin/kube-apiserver.md?pixel)]()
|
||||
<!-- END MUNGE: GENERATED_ANALYTICS -->
|
||||
|
|
|
@ -28,7 +28,6 @@ import (
|
|||
"reflect"
|
||||
gruntime "runtime"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
|
@ -93,24 +92,6 @@ type Config struct {
|
|||
Burst int
|
||||
}
|
||||
|
||||
type KubeletConfig struct {
|
||||
// ToDo: Add support for different kubelet instances exposing different ports
|
||||
Port uint
|
||||
EnableHttps bool
|
||||
|
||||
// TLSClientConfig contains settings to enable transport layer security
|
||||
TLSClientConfig
|
||||
|
||||
// Server requires Bearer authentication
|
||||
BearerToken string
|
||||
|
||||
// HTTPTimeout is used by the client to timeout http requests to Kubelet.
|
||||
HTTPTimeout time.Duration
|
||||
|
||||
// Dial is a custom dialer used for the client
|
||||
Dial func(net, addr string) (net.Conn, error)
|
||||
}
|
||||
|
||||
// TLSClientConfig contains settings to enable transport layer security
|
||||
type TLSClientConfig struct {
|
||||
// Server requires TLS client certificate authentication
|
||||
|
|
|
@ -64,22 +64,3 @@ func (c *Config) transportConfig() *transport.Config {
|
|||
BearerToken: c.BearerToken,
|
||||
}
|
||||
}
|
||||
|
||||
// transportConfig converts a client config to an appropriate transport config.
|
||||
func (c *KubeletConfig) transportConfig() *transport.Config {
|
||||
cfg := &transport.Config{
|
||||
TLS: transport.TLSConfig{
|
||||
CAFile: c.CAFile,
|
||||
CAData: c.CAData,
|
||||
CertFile: c.CertFile,
|
||||
CertData: c.CertData,
|
||||
KeyFile: c.KeyFile,
|
||||
KeyData: c.KeyData,
|
||||
},
|
||||
BearerToken: c.BearerToken,
|
||||
}
|
||||
if c.EnableHttps && !cfg.HasCA() {
|
||||
cfg.TLS.Insecure = true
|
||||
}
|
||||
return cfg
|
||||
}
|
||||
|
|
|
@ -14,32 +14,54 @@ See the License for the specific language governing permissions and
|
|||
limitations under the License.
|
||||
*/
|
||||
|
||||
package unversioned
|
||||
package client
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"net"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/client/transport"
|
||||
client "k8s.io/kubernetes/pkg/client/unversioned"
|
||||
"k8s.io/kubernetes/pkg/util"
|
||||
)
|
||||
|
||||
type KubeletClientConfig struct {
|
||||
// Default port - used if no information about Kubelet port can be found in Node.NodeStatus.DaemonEndpoints.
|
||||
Port uint
|
||||
EnableHttps bool
|
||||
|
||||
// TLSClientConfig contains settings to enable transport layer security
|
||||
client.TLSClientConfig
|
||||
|
||||
// Server requires Bearer authentication
|
||||
BearerToken string
|
||||
|
||||
// HTTPTimeout is used by the client to timeout http requests to Kubelet.
|
||||
HTTPTimeout time.Duration
|
||||
|
||||
// Dial is a custom dialer used for the client
|
||||
Dial func(net, addr string) (net.Conn, error)
|
||||
}
|
||||
|
||||
// KubeletClient is an interface for all kubelet functionality
|
||||
type KubeletClient interface {
|
||||
ConnectionInfoGetter
|
||||
}
|
||||
|
||||
type ConnectionInfoGetter interface {
|
||||
GetConnectionInfo(host string) (scheme string, port uint, transport http.RoundTripper, err error)
|
||||
GetConnectionInfo(ctx api.Context, nodeName string) (scheme string, port uint, transport http.RoundTripper, err error)
|
||||
}
|
||||
|
||||
// HTTPKubeletClient is the default implementation of KubeletHealthchecker, accesses the kubelet over HTTP.
|
||||
type HTTPKubeletClient struct {
|
||||
Client *http.Client
|
||||
Config *KubeletConfig
|
||||
Config *KubeletClientConfig
|
||||
}
|
||||
|
||||
func MakeTransport(config *KubeletConfig) (http.RoundTripper, error) {
|
||||
func MakeTransport(config *KubeletClientConfig) (http.RoundTripper, error) {
|
||||
tlsConfig, err := transport.TLSConfigFor(config.transportConfig())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -57,7 +79,7 @@ func MakeTransport(config *KubeletConfig) (http.RoundTripper, error) {
|
|||
}
|
||||
|
||||
// TODO: this structure is questionable, it should be using client.Config and overriding defaults.
|
||||
func NewKubeletClient(config *KubeletConfig) (KubeletClient, error) {
|
||||
func NewStaticKubeletClient(config *KubeletClientConfig) (KubeletClient, error) {
|
||||
transport, err := MakeTransport(config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -72,7 +94,8 @@ func NewKubeletClient(config *KubeletConfig) (KubeletClient, error) {
|
|||
}, nil
|
||||
}
|
||||
|
||||
func (c *HTTPKubeletClient) GetConnectionInfo(host string) (string, uint, http.RoundTripper, error) {
|
||||
// In default HTTPKubeletClient ctx is unused.
|
||||
func (c *HTTPKubeletClient) GetConnectionInfo(ctx api.Context, nodeName string) (string, uint, http.RoundTripper, error) {
|
||||
scheme := "http"
|
||||
if c.Config.EnableHttps {
|
||||
scheme = "https"
|
||||
|
@ -85,6 +108,25 @@ func (c *HTTPKubeletClient) GetConnectionInfo(host string) (string, uint, http.R
|
|||
// no kubelets.
|
||||
type FakeKubeletClient struct{}
|
||||
|
||||
func (c FakeKubeletClient) GetConnectionInfo(host string) (string, uint, http.RoundTripper, error) {
|
||||
func (c FakeKubeletClient) GetConnectionInfo(ctx api.Context, nodeName string) (string, uint, http.RoundTripper, error) {
|
||||
return "", 0, nil, errors.New("Not Implemented")
|
||||
}
|
||||
|
||||
// transportConfig converts a client config to an appropriate transport config.
|
||||
func (c *KubeletClientConfig) transportConfig() *transport.Config {
|
||||
cfg := &transport.Config{
|
||||
TLS: transport.TLSConfig{
|
||||
CAFile: c.CAFile,
|
||||
CAData: c.CAData,
|
||||
CertFile: c.CertFile,
|
||||
CertData: c.CertData,
|
||||
KeyFile: c.KeyFile,
|
||||
KeyData: c.KeyData,
|
||||
},
|
||||
BearerToken: c.BearerToken,
|
||||
}
|
||||
if c.EnableHttps && !cfg.HasCA() {
|
||||
cfg.TLS.Insecure = true
|
||||
}
|
||||
return cfg
|
||||
}
|
|
@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
|
|||
limitations under the License.
|
||||
*/
|
||||
|
||||
package unversioned
|
||||
package client
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
|
@ -22,6 +22,7 @@ import (
|
|||
"net/url"
|
||||
"testing"
|
||||
|
||||
client "k8s.io/kubernetes/pkg/client/unversioned"
|
||||
"k8s.io/kubernetes/pkg/probe"
|
||||
"k8s.io/kubernetes/pkg/util"
|
||||
)
|
||||
|
@ -40,18 +41,17 @@ func TestHTTPKubeletClient(t *testing.T) {
|
|||
testServer := httptest.NewServer(&fakeHandler)
|
||||
defer testServer.Close()
|
||||
|
||||
_, err = url.Parse(testServer.URL)
|
||||
if err != nil {
|
||||
if _, err := url.Parse(testServer.URL); err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestNewKubeletClient(t *testing.T) {
|
||||
config := &KubeletConfig{
|
||||
config := &KubeletClientConfig{
|
||||
EnableHttps: false,
|
||||
}
|
||||
|
||||
client, err := NewKubeletClient(config)
|
||||
client, err := NewStaticKubeletClient(config)
|
||||
if err != nil {
|
||||
t.Errorf("Error while trying to create a client: %v", err)
|
||||
}
|
||||
|
@ -61,17 +61,17 @@ func TestNewKubeletClient(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestNewKubeletClientTLSInvalid(t *testing.T) {
|
||||
config := &KubeletConfig{
|
||||
config := &KubeletClientConfig{
|
||||
EnableHttps: true,
|
||||
//Invalid certificate and key path
|
||||
TLSClientConfig: TLSClientConfig{
|
||||
CertFile: "../testdata/mycertinvalid.cer",
|
||||
KeyFile: "../testdata/mycertinvalid.key",
|
||||
CAFile: "../testdata/myCA.cer",
|
||||
TLSClientConfig: client.TLSClientConfig{
|
||||
CertFile: "../../client/testdata/mycertinvalid.cer",
|
||||
KeyFile: "../../client/testdata/mycertinvalid.key",
|
||||
CAFile: "../../client/testdata/myCA.cer",
|
||||
},
|
||||
}
|
||||
|
||||
client, err := NewKubeletClient(config)
|
||||
client, err := NewStaticKubeletClient(config)
|
||||
if err == nil {
|
||||
t.Errorf("Expected an error")
|
||||
}
|
||||
|
@ -81,18 +81,18 @@ func TestNewKubeletClientTLSInvalid(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestNewKubeletClientTLSValid(t *testing.T) {
|
||||
config := &KubeletConfig{
|
||||
config := &KubeletClientConfig{
|
||||
EnableHttps: true,
|
||||
TLSClientConfig: TLSClientConfig{
|
||||
CertFile: "../testdata/mycertvalid.cer",
|
||||
TLSClientConfig: client.TLSClientConfig{
|
||||
CertFile: "../../client/testdata/mycertvalid.cer",
|
||||
// TLS Configuration, only applies if EnableHttps is true.
|
||||
KeyFile: "../testdata/mycertvalid.key",
|
||||
KeyFile: "../../client/testdata/mycertvalid.key",
|
||||
// TLS Configuration, only applies if EnableHttps is true.
|
||||
CAFile: "../testdata/myCA.cer",
|
||||
CAFile: "../../client/testdata/myCA.cer",
|
||||
},
|
||||
}
|
||||
|
||||
client, err := NewKubeletClient(config)
|
||||
client, err := NewStaticKubeletClient(config)
|
||||
if err != nil {
|
||||
t.Errorf("Not expecting an error #%v", err)
|
||||
}
|
|
@ -41,8 +41,8 @@ import (
|
|||
"k8s.io/kubernetes/pkg/auth/authenticator"
|
||||
"k8s.io/kubernetes/pkg/auth/authorizer"
|
||||
"k8s.io/kubernetes/pkg/auth/handlers"
|
||||
client "k8s.io/kubernetes/pkg/client/unversioned"
|
||||
"k8s.io/kubernetes/pkg/healthz"
|
||||
kubeletclient "k8s.io/kubernetes/pkg/kubelet/client"
|
||||
"k8s.io/kubernetes/pkg/master/ports"
|
||||
"k8s.io/kubernetes/pkg/registry/componentstatus"
|
||||
controlleretcd "k8s.io/kubernetes/pkg/registry/controller/etcd"
|
||||
|
@ -182,7 +182,7 @@ type Config struct {
|
|||
// StorageVersions is a map between groups and their storage versions
|
||||
StorageVersions map[string]string
|
||||
EventTTL time.Duration
|
||||
KubeletClient client.KubeletClient
|
||||
KubeletClient kubeletclient.KubeletClient
|
||||
// allow downstream consumers to disable the core controller loops
|
||||
EnableCoreControllers bool
|
||||
EnableLogsSupport bool
|
||||
|
@ -533,7 +533,6 @@ func (m *Master) init(c *Config) {
|
|||
|
||||
storageDecorator := c.storageDecorator()
|
||||
dbClient := func(resource string) storage.Interface { return c.StorageDestinations.get("", resource) }
|
||||
podStorage := podetcd.NewStorage(dbClient("pods"), storageDecorator, c.KubeletClient, m.proxyTransport)
|
||||
|
||||
podTemplateStorage := podtemplateetcd.NewREST(dbClient("podTemplates"), storageDecorator)
|
||||
|
||||
|
@ -555,6 +554,13 @@ func (m *Master) init(c *Config) {
|
|||
nodeStorage, nodeStatusStorage := nodeetcd.NewREST(dbClient("nodes"), storageDecorator, c.KubeletClient, m.proxyTransport)
|
||||
m.nodeRegistry = node.NewRegistry(nodeStorage)
|
||||
|
||||
podStorage := podetcd.NewStorage(
|
||||
dbClient("pods"),
|
||||
storageDecorator,
|
||||
kubeletclient.ConnectionInfoGetter(nodeStorage),
|
||||
m.proxyTransport,
|
||||
)
|
||||
|
||||
serviceStorage := serviceetcd.NewREST(dbClient("services"), storageDecorator)
|
||||
m.serviceRegistry = service.NewRegistry(serviceStorage)
|
||||
|
||||
|
|
|
@ -39,7 +39,7 @@ import (
|
|||
"k8s.io/kubernetes/pkg/api/v1"
|
||||
"k8s.io/kubernetes/pkg/apis/extensions"
|
||||
"k8s.io/kubernetes/pkg/apiserver"
|
||||
client "k8s.io/kubernetes/pkg/client/unversioned"
|
||||
"k8s.io/kubernetes/pkg/kubelet/client"
|
||||
"k8s.io/kubernetes/pkg/registry/endpoint"
|
||||
"k8s.io/kubernetes/pkg/registry/namespace"
|
||||
"k8s.io/kubernetes/pkg/registry/registrytest"
|
||||
|
|
|
@ -17,12 +17,13 @@ limitations under the License.
|
|||
package etcd
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/rest"
|
||||
client "k8s.io/kubernetes/pkg/client/unversioned"
|
||||
"k8s.io/kubernetes/pkg/kubelet/client"
|
||||
"k8s.io/kubernetes/pkg/registry/generic"
|
||||
etcdgeneric "k8s.io/kubernetes/pkg/registry/generic/etcd"
|
||||
"k8s.io/kubernetes/pkg/registry/node"
|
||||
|
@ -90,5 +91,35 @@ var _ = rest.Redirector(&REST{})
|
|||
|
||||
// ResourceLocation returns a URL to which one can send traffic for the specified node.
|
||||
func (r *REST) ResourceLocation(ctx api.Context, id string) (*url.URL, http.RoundTripper, error) {
|
||||
return node.ResourceLocation(r, r.connection, r.proxyTransport, ctx, id)
|
||||
return node.ResourceLocation(r, r, r.proxyTransport, ctx, id)
|
||||
}
|
||||
|
||||
var _ = client.ConnectionInfoGetter(&REST{})
|
||||
|
||||
func (r *REST) getKubeletPort(ctx api.Context, nodeName string) (int, error) {
|
||||
// We probably shouldn't care about context when looking for Node object.
|
||||
obj, err := r.Get(ctx, nodeName)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
node, ok := obj.(*api.Node)
|
||||
if !ok {
|
||||
return 0, fmt.Errorf("Unexpected object type: %#v", node)
|
||||
}
|
||||
return node.Status.DaemonEndpoints.KubeletEndpoint.Port, nil
|
||||
}
|
||||
|
||||
func (c *REST) GetConnectionInfo(ctx api.Context, nodeName string) (string, uint, http.RoundTripper, error) {
|
||||
scheme, port, transport, err := c.connection.GetConnectionInfo(ctx, nodeName)
|
||||
if err != nil {
|
||||
return "", 0, nil, err
|
||||
}
|
||||
daemonPort, err := c.getKubeletPort(ctx, nodeName)
|
||||
if err != nil {
|
||||
return "", 0, nil, err
|
||||
}
|
||||
if daemonPort > 0 {
|
||||
return scheme, uint(daemonPort), transport, nil
|
||||
}
|
||||
return scheme, port, transport, nil
|
||||
}
|
||||
|
|
|
@ -33,7 +33,7 @@ import (
|
|||
type fakeConnectionInfoGetter struct {
|
||||
}
|
||||
|
||||
func (fakeConnectionInfoGetter) GetConnectionInfo(host string) (string, uint, http.RoundTripper, error) {
|
||||
func (fakeConnectionInfoGetter) GetConnectionInfo(ctx api.Context, nodeName string) (string, uint, http.RoundTripper, error) {
|
||||
return "http", 12345, nil, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -26,8 +26,8 @@ import (
|
|||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/errors"
|
||||
"k8s.io/kubernetes/pkg/api/validation"
|
||||
client "k8s.io/kubernetes/pkg/client/unversioned"
|
||||
"k8s.io/kubernetes/pkg/fields"
|
||||
"k8s.io/kubernetes/pkg/kubelet/client"
|
||||
"k8s.io/kubernetes/pkg/labels"
|
||||
"k8s.io/kubernetes/pkg/master/ports"
|
||||
"k8s.io/kubernetes/pkg/registry/generic"
|
||||
|
@ -162,9 +162,16 @@ func ResourceLocation(getter ResourceGetter, connection client.ConnectionInfoGet
|
|||
}
|
||||
host := hostIP.String()
|
||||
|
||||
if portReq == "" || strconv.Itoa(ports.KubeletPort) == portReq {
|
||||
// Ignore requested scheme, use scheme provided by GetConnectionInfo
|
||||
scheme, port, kubeletTransport, err := connection.GetConnectionInfo(host)
|
||||
// We check if we want to get a default Kubelet's transport. It happens if either:
|
||||
// - no port is specified in request (Kubelet's port is default),
|
||||
// - we're using Port stored as a DaemonEndpoint and requested port is a Kubelet's port stored in the DaemonEndpoint,
|
||||
// - there's no information in the API about DaemonEnpoint (legacy cluster) and requested port is equal to ports.KubeletPort (cluster-wide config)
|
||||
kubeletPort := node.Status.DaemonEndpoints.KubeletEndpoint.Port
|
||||
if kubeletPort == 0 {
|
||||
kubeletPort = ports.KubeletPort
|
||||
}
|
||||
if portReq == "" || strconv.Itoa(kubeletPort) == portReq {
|
||||
scheme, port, kubeletTransport, err := connection.GetConnectionInfo(ctx, node.Name)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
|
|
@ -26,8 +26,8 @@ import (
|
|||
etcderr "k8s.io/kubernetes/pkg/api/errors/etcd"
|
||||
"k8s.io/kubernetes/pkg/api/rest"
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
client "k8s.io/kubernetes/pkg/client/unversioned"
|
||||
"k8s.io/kubernetes/pkg/fields"
|
||||
"k8s.io/kubernetes/pkg/kubelet/client"
|
||||
"k8s.io/kubernetes/pkg/labels"
|
||||
"k8s.io/kubernetes/pkg/registry/generic"
|
||||
etcdgeneric "k8s.io/kubernetes/pkg/registry/generic/etcd"
|
||||
|
@ -57,7 +57,12 @@ type REST struct {
|
|||
}
|
||||
|
||||
// NewStorage returns a RESTStorage object that will work against pods.
|
||||
func NewStorage(s storage.Interface, storageDecorator generic.StorageDecorator, k client.ConnectionInfoGetter, proxyTransport http.RoundTripper) PodStorage {
|
||||
func NewStorage(
|
||||
s storage.Interface,
|
||||
storageDecorator generic.StorageDecorator,
|
||||
k client.ConnectionInfoGetter,
|
||||
proxyTransport http.RoundTripper,
|
||||
) PodStorage {
|
||||
prefix := "/pods"
|
||||
|
||||
newListFunc := func() runtime.Object { return &api.PodList{} }
|
||||
|
|
|
@ -23,7 +23,7 @@ import (
|
|||
"k8s.io/kubernetes/pkg/api/errors"
|
||||
"k8s.io/kubernetes/pkg/api/rest"
|
||||
"k8s.io/kubernetes/pkg/api/validation"
|
||||
client "k8s.io/kubernetes/pkg/client/unversioned"
|
||||
"k8s.io/kubernetes/pkg/kubelet/client"
|
||||
etcdgeneric "k8s.io/kubernetes/pkg/registry/generic/etcd"
|
||||
genericrest "k8s.io/kubernetes/pkg/registry/generic/rest"
|
||||
"k8s.io/kubernetes/pkg/registry/pod"
|
||||
|
@ -32,8 +32,8 @@ import (
|
|||
|
||||
// LogREST implements the log endpoint for a Pod
|
||||
type LogREST struct {
|
||||
Store *etcdgeneric.Etcd
|
||||
KubeletConn client.ConnectionInfoGetter
|
||||
Store *etcdgeneric.Etcd
|
||||
}
|
||||
|
||||
// LogREST implements GetterWithOptions
|
||||
|
@ -49,7 +49,7 @@ func (r *LogREST) New() runtime.Object {
|
|||
func (r *LogREST) Get(ctx api.Context, name string, opts runtime.Object) (runtime.Object, error) {
|
||||
logOpts, ok := opts.(*api.PodLogOptions)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("Invalid options object: %#v", opts)
|
||||
return nil, fmt.Errorf("invalid options object: %#v", opts)
|
||||
}
|
||||
if errs := validation.ValidatePodLogOptions(logOpts); len(errs) > 0 {
|
||||
return nil, errors.NewInvalid("podlogs", name, errs)
|
||||
|
|
|
@ -25,7 +25,7 @@ import (
|
|||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/rest"
|
||||
"k8s.io/kubernetes/pkg/capabilities"
|
||||
client "k8s.io/kubernetes/pkg/client/unversioned"
|
||||
"k8s.io/kubernetes/pkg/kubelet/client"
|
||||
etcdgeneric "k8s.io/kubernetes/pkg/registry/generic/etcd"
|
||||
genericrest "k8s.io/kubernetes/pkg/registry/generic/rest"
|
||||
"k8s.io/kubernetes/pkg/registry/pod"
|
||||
|
|
|
@ -27,8 +27,8 @@ import (
|
|||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/errors"
|
||||
"k8s.io/kubernetes/pkg/api/validation"
|
||||
client "k8s.io/kubernetes/pkg/client/unversioned"
|
||||
"k8s.io/kubernetes/pkg/fields"
|
||||
"k8s.io/kubernetes/pkg/kubelet/client"
|
||||
"k8s.io/kubernetes/pkg/labels"
|
||||
"k8s.io/kubernetes/pkg/registry/generic"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
|
@ -233,7 +233,13 @@ func ResourceLocation(getter ResourceGetter, rt http.RoundTripper, ctx api.Conte
|
|||
|
||||
// LogLocation returns the log URL for a pod container. If opts.Container is blank
|
||||
// and only one container is present in the pod, that container is used.
|
||||
func LogLocation(getter ResourceGetter, connInfo client.ConnectionInfoGetter, ctx api.Context, name string, opts *api.PodLogOptions) (*url.URL, http.RoundTripper, error) {
|
||||
func LogLocation(
|
||||
getter ResourceGetter,
|
||||
connInfo client.ConnectionInfoGetter,
|
||||
ctx api.Context,
|
||||
name string,
|
||||
opts *api.PodLogOptions,
|
||||
) (*url.URL, http.RoundTripper, error) {
|
||||
pod, err := getPod(getter, ctx, name)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
|
@ -258,7 +264,7 @@ func LogLocation(getter ResourceGetter, connInfo client.ConnectionInfoGetter, ct
|
|||
// If pod has not been assigned a host, return an empty location
|
||||
return nil, nil, nil
|
||||
}
|
||||
nodeScheme, nodePort, nodeTransport, err := connInfo.GetConnectionInfo(nodeHost)
|
||||
nodeScheme, nodePort, nodeTransport, err := connInfo.GetConnectionInfo(ctx, nodeHost)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
@ -341,17 +347,37 @@ func streamParams(params url.Values, opts runtime.Object) error {
|
|||
|
||||
// AttachLocation returns the attach URL for a pod container. If opts.Container is blank
|
||||
// and only one container is present in the pod, that container is used.
|
||||
func AttachLocation(getter ResourceGetter, connInfo client.ConnectionInfoGetter, ctx api.Context, name string, opts *api.PodAttachOptions) (*url.URL, http.RoundTripper, error) {
|
||||
func AttachLocation(
|
||||
getter ResourceGetter,
|
||||
connInfo client.ConnectionInfoGetter,
|
||||
ctx api.Context,
|
||||
name string,
|
||||
opts *api.PodAttachOptions,
|
||||
) (*url.URL, http.RoundTripper, error) {
|
||||
return streamLocation(getter, connInfo, ctx, name, opts, opts.Container, "attach")
|
||||
}
|
||||
|
||||
// ExecLocation returns the exec URL for a pod container. If opts.Container is blank
|
||||
// and only one container is present in the pod, that container is used.
|
||||
func ExecLocation(getter ResourceGetter, connInfo client.ConnectionInfoGetter, ctx api.Context, name string, opts *api.PodExecOptions) (*url.URL, http.RoundTripper, error) {
|
||||
func ExecLocation(
|
||||
getter ResourceGetter,
|
||||
connInfo client.ConnectionInfoGetter,
|
||||
ctx api.Context,
|
||||
name string,
|
||||
opts *api.PodExecOptions,
|
||||
) (*url.URL, http.RoundTripper, error) {
|
||||
return streamLocation(getter, connInfo, ctx, name, opts, opts.Container, "exec")
|
||||
}
|
||||
|
||||
func streamLocation(getter ResourceGetter, connInfo client.ConnectionInfoGetter, ctx api.Context, name string, opts runtime.Object, container, path string) (*url.URL, http.RoundTripper, error) {
|
||||
func streamLocation(
|
||||
getter ResourceGetter,
|
||||
connInfo client.ConnectionInfoGetter,
|
||||
ctx api.Context,
|
||||
name string,
|
||||
opts runtime.Object,
|
||||
container,
|
||||
path string,
|
||||
) (*url.URL, http.RoundTripper, error) {
|
||||
pod, err := getPod(getter, ctx, name)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
|
@ -375,7 +401,7 @@ func streamLocation(getter ResourceGetter, connInfo client.ConnectionInfoGetter,
|
|||
// If pod has not been assigned a host, return an empty location
|
||||
return nil, nil, errors.NewBadRequest(fmt.Sprintf("pod %s does not have a host assigned", name))
|
||||
}
|
||||
nodeScheme, nodePort, nodeTransport, err := connInfo.GetConnectionInfo(nodeHost)
|
||||
nodeScheme, nodePort, nodeTransport, err := connInfo.GetConnectionInfo(ctx, nodeHost)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
@ -393,7 +419,12 @@ func streamLocation(getter ResourceGetter, connInfo client.ConnectionInfoGetter,
|
|||
}
|
||||
|
||||
// PortForwardLocation returns the port-forward URL for a pod.
|
||||
func PortForwardLocation(getter ResourceGetter, connInfo client.ConnectionInfoGetter, ctx api.Context, name string) (*url.URL, http.RoundTripper, error) {
|
||||
func PortForwardLocation(
|
||||
getter ResourceGetter,
|
||||
connInfo client.ConnectionInfoGetter,
|
||||
ctx api.Context,
|
||||
name string,
|
||||
) (*url.URL, http.RoundTripper, error) {
|
||||
pod, err := getPod(getter, ctx, name)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
|
@ -404,7 +435,7 @@ func PortForwardLocation(getter ResourceGetter, connInfo client.ConnectionInfoGe
|
|||
// If pod has not been assigned a host, return an empty location
|
||||
return nil, nil, errors.NewBadRequest(fmt.Sprintf("pod %s does not have a host assigned", name))
|
||||
}
|
||||
nodeScheme, nodePort, nodeTransport, err := connInfo.GetConnectionInfo(nodeHost)
|
||||
nodeScheme, nodePort, nodeTransport, err := connInfo.GetConnectionInfo(ctx, nodeHost)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
|
|
@ -45,7 +45,7 @@ import (
|
|||
"k8s.io/kubernetes/pkg/auth/authorizer"
|
||||
"k8s.io/kubernetes/pkg/auth/authorizer/abac"
|
||||
"k8s.io/kubernetes/pkg/auth/user"
|
||||
client "k8s.io/kubernetes/pkg/client/unversioned"
|
||||
"k8s.io/kubernetes/pkg/kubelet/client"
|
||||
"k8s.io/kubernetes/pkg/master"
|
||||
"k8s.io/kubernetes/plugin/pkg/admission/admit"
|
||||
"k8s.io/kubernetes/plugin/pkg/auth/authenticator/token/tokentest"
|
||||
|
|
|
@ -36,6 +36,7 @@ import (
|
|||
"k8s.io/kubernetes/pkg/apiserver"
|
||||
"k8s.io/kubernetes/pkg/client/record"
|
||||
client "k8s.io/kubernetes/pkg/client/unversioned"
|
||||
kubeletclient "k8s.io/kubernetes/pkg/kubelet/client"
|
||||
"k8s.io/kubernetes/pkg/master"
|
||||
"k8s.io/kubernetes/pkg/util"
|
||||
"k8s.io/kubernetes/pkg/util/wait"
|
||||
|
@ -214,7 +215,7 @@ func TestSchedulerExtender(t *testing.T) {
|
|||
|
||||
m = master.New(&master.Config{
|
||||
StorageDestinations: storageDestinations,
|
||||
KubeletClient: client.FakeKubeletClient{},
|
||||
KubeletClient: kubeletclient.FakeKubeletClient{},
|
||||
EnableCoreControllers: true,
|
||||
EnableLogsSupport: false,
|
||||
EnableUISupport: false,
|
||||
|
|
|
@ -36,6 +36,7 @@ import (
|
|||
replicationcontroller "k8s.io/kubernetes/pkg/controller/replication"
|
||||
"k8s.io/kubernetes/pkg/fields"
|
||||
"k8s.io/kubernetes/pkg/kubectl"
|
||||
kubeletclient "k8s.io/kubernetes/pkg/kubelet/client"
|
||||
"k8s.io/kubernetes/pkg/labels"
|
||||
"k8s.io/kubernetes/pkg/master"
|
||||
"k8s.io/kubernetes/pkg/storage/etcd/etcdtest"
|
||||
|
@ -143,7 +144,7 @@ func startMasterOrDie(masterConfig *master.Config) (*master.Master, *httptest.Se
|
|||
masterConfig = &master.Config{
|
||||
StorageDestinations: storageDestinations,
|
||||
StorageVersions: storageVersions,
|
||||
KubeletClient: client.FakeKubeletClient{},
|
||||
KubeletClient: kubeletclient.FakeKubeletClient{},
|
||||
EnableLogsSupport: false,
|
||||
EnableProfiling: true,
|
||||
EnableSwaggerSupport: true,
|
||||
|
@ -285,7 +286,7 @@ func RunAMaster(t *testing.T) (*master.Master, *httptest.Server) {
|
|||
|
||||
m := master.New(&master.Config{
|
||||
StorageDestinations: storageDestinations,
|
||||
KubeletClient: client.FakeKubeletClient{},
|
||||
KubeletClient: kubeletclient.FakeKubeletClient{},
|
||||
EnableLogsSupport: false,
|
||||
EnableProfiling: true,
|
||||
EnableUISupport: false,
|
||||
|
|
|
@ -38,6 +38,7 @@ import (
|
|||
"k8s.io/kubernetes/pkg/client/cache"
|
||||
"k8s.io/kubernetes/pkg/client/record"
|
||||
client "k8s.io/kubernetes/pkg/client/unversioned"
|
||||
kubeletclient "k8s.io/kubernetes/pkg/kubelet/client"
|
||||
"k8s.io/kubernetes/pkg/master"
|
||||
"k8s.io/kubernetes/pkg/util"
|
||||
"k8s.io/kubernetes/pkg/util/wait"
|
||||
|
@ -83,7 +84,7 @@ func TestUnschedulableNodes(t *testing.T) {
|
|||
|
||||
m = master.New(&master.Config{
|
||||
StorageDestinations: storageDestinations,
|
||||
KubeletClient: client.FakeKubeletClient{},
|
||||
KubeletClient: kubeletclient.FakeKubeletClient{},
|
||||
EnableCoreControllers: true,
|
||||
EnableLogsSupport: false,
|
||||
EnableUISupport: false,
|
||||
|
@ -334,7 +335,7 @@ func BenchmarkScheduling(b *testing.B) {
|
|||
|
||||
m = master.New(&master.Config{
|
||||
StorageDestinations: storageDestinations,
|
||||
KubeletClient: client.FakeKubeletClient{},
|
||||
KubeletClient: kubeletclient.FakeKubeletClient{},
|
||||
EnableCoreControllers: true,
|
||||
EnableLogsSupport: false,
|
||||
EnableUISupport: false,
|
||||
|
|
|
@ -30,6 +30,7 @@ import (
|
|||
"k8s.io/kubernetes/pkg/api/testapi"
|
||||
"k8s.io/kubernetes/pkg/apiserver"
|
||||
client "k8s.io/kubernetes/pkg/client/unversioned"
|
||||
kubeletclient "k8s.io/kubernetes/pkg/kubelet/client"
|
||||
"k8s.io/kubernetes/pkg/master"
|
||||
"k8s.io/kubernetes/plugin/pkg/admission/admit"
|
||||
"k8s.io/kubernetes/test/integration/framework"
|
||||
|
@ -73,7 +74,7 @@ func TestSecrets(t *testing.T) {
|
|||
|
||||
m = master.New(&master.Config{
|
||||
StorageDestinations: storageDestinations,
|
||||
KubeletClient: client.FakeKubeletClient{},
|
||||
KubeletClient: kubeletclient.FakeKubeletClient{},
|
||||
EnableCoreControllers: true,
|
||||
EnableLogsSupport: false,
|
||||
EnableUISupport: false,
|
||||
|
|
|
@ -41,6 +41,7 @@ import (
|
|||
client "k8s.io/kubernetes/pkg/client/unversioned"
|
||||
"k8s.io/kubernetes/pkg/controller/serviceaccount"
|
||||
"k8s.io/kubernetes/pkg/fields"
|
||||
kubeletclient "k8s.io/kubernetes/pkg/kubelet/client"
|
||||
"k8s.io/kubernetes/pkg/labels"
|
||||
"k8s.io/kubernetes/pkg/master"
|
||||
"k8s.io/kubernetes/pkg/util/sets"
|
||||
|
@ -424,7 +425,7 @@ func startServiceAccountTestServer(t *testing.T) (*client.Client, client.Config,
|
|||
// Create a master and install handlers into mux.
|
||||
m = master.New(&master.Config{
|
||||
StorageDestinations: storageDestinations,
|
||||
KubeletClient: client.FakeKubeletClient{},
|
||||
KubeletClient: kubeletclient.FakeKubeletClient{},
|
||||
EnableLogsSupport: false,
|
||||
EnableUISupport: false,
|
||||
EnableIndex: true,
|
||||
|
|
Loading…
Reference in New Issue