Merge remote-tracking branch 'origin/master' into release-1.14

pull/564/head
Hannes Hoerl 2019-03-12 09:22:01 +00:00
commit b1e389e6f7
57 changed files with 905 additions and 121 deletions

View File

@ -413,6 +413,14 @@ if [[ -n "${LOGROTATE_MAX_SIZE:-}" ]]; then
PROVIDER_VARS="${PROVIDER_VARS:-} LOGROTATE_MAX_SIZE" PROVIDER_VARS="${PROVIDER_VARS:-} LOGROTATE_MAX_SIZE"
fi fi
if [[ -n "${POD_LOG_MAX_FILE:-}" ]]; then
PROVIDER_VARS="${PROVIDER_VARS:-} POD_LOG_MAX_FILE"
fi
if [[ -n "${POD_LOG_MAX_SIZE:-}" ]]; then
PROVIDER_VARS="${PROVIDER_VARS:-} POD_LOG_MAX_SIZE"
fi
# Fluentd requirements # Fluentd requirements
# YAML exists to trigger a configuration refresh when changes are made. # YAML exists to trigger a configuration refresh when changes are made.
FLUENTD_GCP_YAML_VERSION="v3.2.0" FLUENTD_GCP_YAML_VERSION="v3.2.0"

View File

@ -432,6 +432,14 @@ if [[ -n "${LOGROTATE_MAX_SIZE:-}" ]]; then
PROVIDER_VARS="${PROVIDER_VARS:-} LOGROTATE_MAX_SIZE" PROVIDER_VARS="${PROVIDER_VARS:-} LOGROTATE_MAX_SIZE"
fi fi
if [[ -n "${POD_LOG_MAX_FILE:-}" ]]; then
PROVIDER_VARS="${PROVIDER_VARS:-} POD_LOG_MAX_FILE"
fi
if [[ -n "${POD_LOG_MAX_SIZE:-}" ]]; then
PROVIDER_VARS="${PROVIDER_VARS:-} POD_LOG_MAX_SIZE"
fi
# Fluentd requirements # Fluentd requirements
# YAML exists to trigger a configuration refresh when changes are made. # YAML exists to trigger a configuration refresh when changes are made.
FLUENTD_GCP_YAML_VERSION="v3.2.0" FLUENTD_GCP_YAML_VERSION="v3.2.0"

View File

@ -374,6 +374,21 @@ function setup-logrotate() {
} }
EOF EOF
# Configure log rotation for pod logs in /var/log/pods/NAMESPACE_NAME_UID.
cat > /etc/logrotate.d/allpodlogs <<EOF
/var/log/pods/*/*.log {
rotate ${POD_LOG_MAX_FILE:-5}
copytruncate
missingok
notifempty
compress
maxsize ${POD_LOG_MAX_SIZE:-5M}
daily
dateext
dateformat -%Y%m%d-%s
create 0644 root root
}
EOF
} }
# Finds the master PD device; returns it in MASTER_PD_DEVICE # Finds the master PD device; returns it in MASTER_PD_DEVICE

View File

@ -17,7 +17,7 @@
"containers": [ "containers": [
{ {
"name": "cluster-autoscaler", "name": "cluster-autoscaler",
"image": "k8s.gcr.io/cluster-autoscaler:v1.13.0", "image": "k8s.gcr.io/cluster-autoscaler:v1.14.0-beta.1",
"livenessProbe": { "livenessProbe": {
"httpGet": { "httpGet": {
"path": "/health-check", "path": "/health-check",

View File

@ -142,6 +142,7 @@ func NewHyperKubeCommand(stopCh <-chan struct{}) (*cobra.Command, []func() *cobr
} }
cmd.Flags().BoolVar(&makeSymlinksFlag, "make-symlinks", makeSymlinksFlag, "create a symlink for each server in current directory") cmd.Flags().BoolVar(&makeSymlinksFlag, "make-symlinks", makeSymlinksFlag, "create a symlink for each server in current directory")
cmd.Flags().MarkHidden("make-symlinks") // hide this flag from appearing in servers' usage output cmd.Flags().MarkHidden("make-symlinks") // hide this flag from appearing in servers' usage output
cmd.Flags().MarkDeprecated("make-symlinks", "This feature will be removed in a later release.")
for i := range commandFns { for i := range commandFns {
cmd.AddCommand(commandFns[i]()) cmd.AddCommand(commandFns[i]())

View File

@ -39,6 +39,7 @@ go_library(
"//staging/src/k8s.io/apiextensions-apiserver/pkg/client/informers/internalversion:go_default_library", "//staging/src/k8s.io/apiextensions-apiserver/pkg/client/informers/internalversion:go_default_library",
"//staging/src/k8s.io/apiextensions-apiserver/pkg/cmd/server/options:go_default_library", "//staging/src/k8s.io/apiextensions-apiserver/pkg/cmd/server/options:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",

View File

@ -30,6 +30,7 @@ import (
apiextensionsinformers "k8s.io/apiextensions-apiserver/pkg/client/informers/internalversion" apiextensionsinformers "k8s.io/apiextensions-apiserver/pkg/client/informers/internalversion"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/admission" "k8s.io/apiserver/pkg/admission"
@ -79,6 +80,7 @@ func createAggregatorConfig(
etcdOptions := *commandOptions.Etcd etcdOptions := *commandOptions.Etcd
etcdOptions.StorageConfig.Paging = utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking) etcdOptions.StorageConfig.Paging = utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking)
etcdOptions.StorageConfig.Codec = aggregatorscheme.Codecs.LegacyCodec(v1beta1.SchemeGroupVersion, v1.SchemeGroupVersion) etcdOptions.StorageConfig.Codec = aggregatorscheme.Codecs.LegacyCodec(v1beta1.SchemeGroupVersion, v1.SchemeGroupVersion)
etcdOptions.StorageConfig.EncodeVersioner = runtime.NewMultiGroupVersioner(v1beta1.SchemeGroupVersion, schema.GroupKind{Group: v1beta1.GroupName})
genericConfig.RESTOptionsGetter = &genericoptions.SimpleRestOptionsFactory{Options: etcdOptions} genericConfig.RESTOptionsGetter = &genericoptions.SimpleRestOptionsFactory{Options: etcdOptions}
// override MergedResourceConfig with aggregator defaults and registry // override MergedResourceConfig with aggregator defaults and registry

View File

@ -23,6 +23,8 @@ import (
"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
apiextensionsapiserver "k8s.io/apiextensions-apiserver/pkg/apiserver" apiextensionsapiserver "k8s.io/apiextensions-apiserver/pkg/apiserver"
apiextensionsoptions "k8s.io/apiextensions-apiserver/pkg/cmd/server/options" apiextensionsoptions "k8s.io/apiextensions-apiserver/pkg/cmd/server/options"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/admission" "k8s.io/apiserver/pkg/admission"
"k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/features"
genericapiserver "k8s.io/apiserver/pkg/server" genericapiserver "k8s.io/apiserver/pkg/server"
@ -61,6 +63,7 @@ func createAPIExtensionsConfig(
etcdOptions := *commandOptions.Etcd etcdOptions := *commandOptions.Etcd
etcdOptions.StorageConfig.Paging = utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking) etcdOptions.StorageConfig.Paging = utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking)
etcdOptions.StorageConfig.Codec = apiextensionsapiserver.Codecs.LegacyCodec(v1beta1.SchemeGroupVersion) etcdOptions.StorageConfig.Codec = apiextensionsapiserver.Codecs.LegacyCodec(v1beta1.SchemeGroupVersion)
etcdOptions.StorageConfig.EncodeVersioner = runtime.NewMultiGroupVersioner(v1beta1.SchemeGroupVersion, schema.GroupKind{Group: v1beta1.GroupName})
genericConfig.RESTOptionsGetter = &genericoptions.SimpleRestOptionsFactory{Options: etcdOptions} genericConfig.RESTOptionsGetter = &genericoptions.SimpleRestOptionsFactory{Options: etcdOptions}
// override MergedResourceConfig with apiextensions defaults and registry // override MergedResourceConfig with apiextensions defaults and registry

View File

@ -379,6 +379,11 @@ func (d *initData) SetCertificateKey(key string) {
d.certificateKey = key d.certificateKey = key
} }
// SkipCertificateKeyPrint returns the skipCertificateKeyPrint flag.
func (d *initData) SkipCertificateKeyPrint() bool {
return d.skipCertificateKeyPrint
}
// Cfg returns initConfiguration. // Cfg returns initConfiguration.
func (d *initData) Cfg() *kubeadmapi.InitConfiguration { func (d *initData) Cfg() *kubeadmapi.InitConfiguration {
return d.cfg return d.cfg

View File

@ -30,6 +30,7 @@ type InitData interface {
UploadCerts() bool UploadCerts() bool
CertificateKey() string CertificateKey() string
SetCertificateKey(key string) SetCertificateKey(key string)
SkipCertificateKeyPrint() bool
Cfg() *kubeadmapi.InitConfiguration Cfg() *kubeadmapi.InitConfiguration
DryRun() bool DryRun() bool
SkipTokenPrint() bool SkipTokenPrint() bool

View File

@ -33,6 +33,7 @@ var _ InitData = &testInitData{}
func (t *testInitData) UploadCerts() bool { return false } func (t *testInitData) UploadCerts() bool { return false }
func (t *testInitData) CertificateKey() string { return "" } func (t *testInitData) CertificateKey() string { return "" }
func (t *testInitData) SetCertificateKey(key string) {} func (t *testInitData) SetCertificateKey(key string) {}
func (t *testInitData) SkipCertificateKeyPrint() bool { return false }
func (t *testInitData) Cfg() *kubeadmapi.InitConfiguration { return nil } func (t *testInitData) Cfg() *kubeadmapi.InitConfiguration { return nil }
func (t *testInitData) DryRun() bool { return false } func (t *testInitData) DryRun() bool { return false }
func (t *testInitData) SkipTokenPrint() bool { return false } func (t *testInitData) SkipTokenPrint() bool { return false }

View File

@ -21,7 +21,6 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
"k8s.io/klog"
"k8s.io/kubernetes/cmd/kubeadm/app/cmd/options" "k8s.io/kubernetes/cmd/kubeadm/app/cmd/options"
"k8s.io/kubernetes/cmd/kubeadm/app/cmd/phases/workflow" "k8s.io/kubernetes/cmd/kubeadm/app/cmd/phases/workflow"
cmdutil "k8s.io/kubernetes/cmd/kubeadm/app/cmd/util" cmdutil "k8s.io/kubernetes/cmd/kubeadm/app/cmd/util"
@ -40,6 +39,7 @@ func NewUploadCertsPhase() workflow.Phase {
options.CfgPath, options.CfgPath,
options.UploadCerts, options.UploadCerts,
options.CertificateKey, options.CertificateKey,
options.SkipCertificateKeyPrint,
}, },
} }
} }
@ -51,7 +51,7 @@ func runUploadCerts(c workflow.RunData) error {
} }
if !data.UploadCerts() { if !data.UploadCerts() {
klog.V(1).Infoln("[upload-certs] Skipping certs upload") fmt.Printf("[upload-certs] Skipping phase. Please see --%s\n", options.UploadCerts)
return nil return nil
} }
client, err := data.Client() client, err := data.Client()
@ -70,5 +70,8 @@ func runUploadCerts(c workflow.RunData) error {
if err := copycerts.UploadCerts(client, data.Cfg(), data.CertificateKey()); err != nil { if err := copycerts.UploadCerts(client, data.Cfg(), data.CertificateKey()); err != nil {
return errors.Wrap(err, "error uploading certs") return errors.Wrap(err, "error uploading certs")
} }
if !data.SkipCertificateKeyPrint() {
fmt.Printf("[upload-certs] Using certificate key:\n%s\n", data.CertificateKey())
}
return nil return nil
} }

View File

@ -85,7 +85,7 @@ func CreateCertificateKey() (string, error) {
//UploadCerts save certs needs to join a new control-plane on kubeadm-certs sercret. //UploadCerts save certs needs to join a new control-plane on kubeadm-certs sercret.
func UploadCerts(client clientset.Interface, cfg *kubeadmapi.InitConfiguration, key string) error { func UploadCerts(client clientset.Interface, cfg *kubeadmapi.InitConfiguration, key string) error {
fmt.Printf("[upload-certs] storing the certificates in ConfigMap %q in the %q Namespace\n", kubeadmconstants.KubeadmCertsSecret, metav1.NamespaceSystem) fmt.Printf("[upload-certs] Storing the certificates in ConfigMap %q in the %q Namespace\n", kubeadmconstants.KubeadmCertsSecret, metav1.NamespaceSystem)
decodedKey, err := hex.DecodeString(key) decodedKey, err := hex.DecodeString(key)
if err != nil { if err != nil {
return err return err

View File

@ -168,20 +168,40 @@ func (pm *basicManager) UpdatePod(pod *v1.Pod) {
} }
} }
func isPodInTerminatedState(pod *v1.Pod) bool {
return pod.Status.Phase == v1.PodFailed || pod.Status.Phase == v1.PodSucceeded
}
// updatePodsInternal replaces the given pods in the current state of the // updatePodsInternal replaces the given pods in the current state of the
// manager, updating the various indices. The caller is assumed to hold the // manager, updating the various indices. The caller is assumed to hold the
// lock. // lock.
func (pm *basicManager) updatePodsInternal(pods ...*v1.Pod) { func (pm *basicManager) updatePodsInternal(pods ...*v1.Pod) {
for _, pod := range pods { for _, pod := range pods {
if pm.secretManager != nil { if pm.secretManager != nil {
// TODO: Consider detecting only status update and in such case do if isPodInTerminatedState(pod) {
// not register pod, as it doesn't really matter. // Pods that are in terminated state and no longer running can be
pm.secretManager.RegisterPod(pod) // ignored as they no longer require access to secrets.
// It is especially important in watch-based manager, to avoid
// unnecessary watches for terminated pods waiting for GC.
pm.secretManager.UnregisterPod(pod)
} else {
// TODO: Consider detecting only status update and in such case do
// not register pod, as it doesn't really matter.
pm.secretManager.RegisterPod(pod)
}
} }
if pm.configMapManager != nil { if pm.configMapManager != nil {
// TODO: Consider detecting only status update and in such case do if isPodInTerminatedState(pod) {
// not register pod, as it doesn't really matter. // Pods that are in terminated state and no longer running can be
pm.configMapManager.RegisterPod(pod) // ignored as they no longer require access to configmaps.
// It is especially important in watch-based manager, to avoid
// unnecessary watches for terminated pods waiting for GC.
pm.configMapManager.UnregisterPod(pod)
} else {
// TODO: Consider detecting only status update and in such case do
// not register pod, as it doesn't really matter.
pm.configMapManager.RegisterPod(pod)
}
} }
podFullName := kubecontainer.GetPodFullName(pod) podFullName := kubecontainer.GetPodFullName(pod)
// This logic relies on a static pod and its mirror to have the same name. // This logic relies on a static pod and its mirror to have the same name.

View File

@ -24,7 +24,7 @@ import (
"testing" "testing"
"time" "time"
"k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -429,6 +429,41 @@ func TestCacheInvalidation(t *testing.T) {
fakeClient.ClearActions() fakeClient.ClearActions()
} }
func TestRegisterIdempotence(t *testing.T) {
fakeClient := &fake.Clientset{}
fakeClock := clock.NewFakeClock(time.Now())
store := newSecretStore(fakeClient, fakeClock, noObjectTTL, time.Minute)
manager := newCacheBasedSecretManager(store)
s1 := secretsToAttach{
imagePullSecretNames: []string{"s1"},
}
refs := func(ns, name string) int {
store.lock.Lock()
defer store.lock.Unlock()
item, ok := store.items[objectKey{ns, name}]
if !ok {
return 0
}
return item.refCount
}
manager.RegisterPod(podWithSecrets("ns1", "name1", s1))
assert.Equal(t, 1, refs("ns1", "s1"))
manager.RegisterPod(podWithSecrets("ns1", "name1", s1))
assert.Equal(t, 1, refs("ns1", "s1"))
manager.RegisterPod(podWithSecrets("ns1", "name2", s1))
assert.Equal(t, 2, refs("ns1", "s1"))
manager.UnregisterPod(podWithSecrets("ns1", "name1", s1))
assert.Equal(t, 1, refs("ns1", "s1"))
manager.UnregisterPod(podWithSecrets("ns1", "name1", s1))
assert.Equal(t, 1, refs("ns1", "s1"))
manager.UnregisterPod(podWithSecrets("ns1", "name2", s1))
assert.Equal(t, 0, refs("ns1", "s1"))
}
func TestCacheRefcounts(t *testing.T) { func TestCacheRefcounts(t *testing.T) {
fakeClient := &fake.Clientset{} fakeClient := &fake.Clientset{}
fakeClock := clock.NewFakeClock(time.Now()) fakeClock := clock.NewFakeClock(time.Now())

View File

@ -32,10 +32,14 @@ type Manager interface {
// i.e. should not block on network operations. // i.e. should not block on network operations.
// RegisterPod registers all objects referenced from a given pod. // RegisterPod registers all objects referenced from a given pod.
//
// NOTE: All implementations of RegisterPod should be idempotent.
RegisterPod(pod *v1.Pod) RegisterPod(pod *v1.Pod)
// UnregisterPod unregisters objects referenced from a given pod that are not // UnregisterPod unregisters objects referenced from a given pod that are not
// used by any other registered pod. // used by any other registered pod.
//
// NOTE: All implementations of UnregisterPod should be idempotent.
UnregisterPod(pod *v1.Pod) UnregisterPod(pod *v1.Pod)
} }

View File

@ -141,10 +141,13 @@ go_test(
deps = [ deps = [
"//pkg/api/legacyscheme:go_default_library", "//pkg/api/legacyscheme:go_default_library",
"//pkg/api/testapi:go_default_library", "//pkg/api/testapi:go_default_library",
"//pkg/apis/batch:go_default_library",
"//pkg/apis/core:go_default_library", "//pkg/apis/core:go_default_library",
"//pkg/apis/storage:go_default_library",
"//pkg/generated/openapi:go_default_library", "//pkg/generated/openapi:go_default_library",
"//pkg/kubelet/client:go_default_library", "//pkg/kubelet/client:go_default_library",
"//pkg/master/reconcilers:go_default_library", "//pkg/master/reconcilers:go_default_library",
"//pkg/master/storageversionhashdata:go_default_library",
"//pkg/registry/certificates/rest:go_default_library", "//pkg/registry/certificates/rest:go_default_library",
"//pkg/registry/core/rest:go_default_library", "//pkg/registry/core/rest:go_default_library",
"//pkg/registry/registrytest:go_default_library", "//pkg/registry/registrytest:go_default_library",
@ -154,16 +157,23 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/api/apitesting/naming:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/apitesting/naming:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/version:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/version:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/authorization/authorizerfactory:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/endpoints/openapi:go_default_library", "//staging/src/k8s.io/apiserver/pkg/endpoints/openapi:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/features:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server:go_default_library", "//staging/src/k8s.io/apiserver/pkg/server:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server/options:go_default_library", "//staging/src/k8s.io/apiserver/pkg/server/options:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server/resourceconfig:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server/storage:go_default_library", "//staging/src/k8s.io/apiserver/pkg/server/storage:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/etcd/testing:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage/etcd/testing:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature/testing:go_default_library",
"//staging/src/k8s.io/client-go/discovery:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
@ -191,6 +201,7 @@ filegroup(
"//pkg/master/controller/crdregistration:all-srcs", "//pkg/master/controller/crdregistration:all-srcs",
"//pkg/master/ports:all-srcs", "//pkg/master/ports:all-srcs",
"//pkg/master/reconcilers:all-srcs", "//pkg/master/reconcilers:all-srcs",
"//pkg/master/storageversionhashdata:all-srcs",
"//pkg/master/tunneler:all-srcs", "//pkg/master/tunneler:all-srcs",
], ],
tags = ["automanaged"], tags = ["automanaged"],

View File

@ -31,22 +31,32 @@ import (
certificatesapiv1beta1 "k8s.io/api/certificates/v1beta1" certificatesapiv1beta1 "k8s.io/api/certificates/v1beta1"
apiv1 "k8s.io/api/core/v1" apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
utilnet "k8s.io/apimachinery/pkg/util/net" utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/version" "k8s.io/apimachinery/pkg/version"
"k8s.io/apiserver/pkg/authorization/authorizerfactory"
"k8s.io/apiserver/pkg/features"
genericapiserver "k8s.io/apiserver/pkg/server" genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/options" "k8s.io/apiserver/pkg/server/options"
"k8s.io/apiserver/pkg/server/resourceconfig"
serverstorage "k8s.io/apiserver/pkg/server/storage" serverstorage "k8s.io/apiserver/pkg/server/storage"
etcdtesting "k8s.io/apiserver/pkg/storage/etcd/testing" etcdtesting "k8s.io/apiserver/pkg/storage/etcd/testing"
utilfeature "k8s.io/apiserver/pkg/util/feature"
utilfeaturetesting "k8s.io/apiserver/pkg/util/feature/testing"
"k8s.io/client-go/discovery"
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/fake"
restclient "k8s.io/client-go/rest" restclient "k8s.io/client-go/rest"
"k8s.io/kubernetes/pkg/api/legacyscheme" "k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/pkg/api/testapi" "k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/apis/batch"
api "k8s.io/kubernetes/pkg/apis/core" api "k8s.io/kubernetes/pkg/apis/core"
apisstorage "k8s.io/kubernetes/pkg/apis/storage"
kubeletclient "k8s.io/kubernetes/pkg/kubelet/client" kubeletclient "k8s.io/kubernetes/pkg/kubelet/client"
"k8s.io/kubernetes/pkg/master/reconcilers" "k8s.io/kubernetes/pkg/master/reconcilers"
"k8s.io/kubernetes/pkg/master/storageversionhashdata"
certificatesrest "k8s.io/kubernetes/pkg/registry/certificates/rest" certificatesrest "k8s.io/kubernetes/pkg/registry/certificates/rest"
corerest "k8s.io/kubernetes/pkg/registry/core/rest" corerest "k8s.io/kubernetes/pkg/registry/core/rest"
"k8s.io/kubernetes/pkg/registry/registrytest" "k8s.io/kubernetes/pkg/registry/registrytest"
@ -70,6 +80,14 @@ func setUp(t *testing.T) (*etcdtesting.EtcdTestServer, Config, *assert.Assertion
} }
resourceEncoding := serverstorage.NewDefaultResourceEncodingConfig(legacyscheme.Scheme) resourceEncoding := serverstorage.NewDefaultResourceEncodingConfig(legacyscheme.Scheme)
// This configures the testing master the same way the real master is
// configured. The storage versions of these resources are different
// from the storage versions of other resources in their group.
resourceEncodingOverrides := []schema.GroupVersionResource{
batch.Resource("cronjobs").WithVersion("v1beta1"),
apisstorage.Resource("volumeattachments").WithVersion("v1beta1"),
}
resourceEncoding = resourceconfig.MergeResourceEncodingConfigs(resourceEncoding, resourceEncodingOverrides)
storageFactory := serverstorage.NewDefaultStorageFactory(*storageConfig, testapi.StorageMediaType(), legacyscheme.Codecs, resourceEncoding, DefaultAPIResourceConfigSource(), nil) storageFactory := serverstorage.NewDefaultStorageFactory(*storageConfig, testapi.StorageMediaType(), legacyscheme.Codecs, resourceEncoding, DefaultAPIResourceConfigSource(), nil)
etcdOptions := options.NewEtcdOptions(storageConfig) etcdOptions := options.NewEtcdOptions(storageConfig)
@ -81,12 +99,12 @@ func setUp(t *testing.T) (*etcdtesting.EtcdTestServer, Config, *assert.Assertion
} }
kubeVersion := kubeversion.Get() kubeVersion := kubeversion.Get()
config.GenericConfig.Authorization.Authorizer = authorizerfactory.NewAlwaysAllowAuthorizer()
config.GenericConfig.Version = &kubeVersion config.GenericConfig.Version = &kubeVersion
config.ExtraConfig.StorageFactory = storageFactory config.ExtraConfig.StorageFactory = storageFactory
config.GenericConfig.LoopbackClientConfig = &restclient.Config{APIPath: "/api", ContentConfig: restclient.ContentConfig{NegotiatedSerializer: legacyscheme.Codecs}} config.GenericConfig.LoopbackClientConfig = &restclient.Config{APIPath: "/api", ContentConfig: restclient.ContentConfig{NegotiatedSerializer: legacyscheme.Codecs}}
config.GenericConfig.PublicAddress = net.ParseIP("192.168.10.4") config.GenericConfig.PublicAddress = net.ParseIP("192.168.10.4")
config.GenericConfig.LegacyAPIGroupPrefixes = sets.NewString("/api") config.GenericConfig.LegacyAPIGroupPrefixes = sets.NewString("/api")
config.GenericConfig.LoopbackClientConfig = &restclient.Config{APIPath: "/api", ContentConfig: restclient.ContentConfig{NegotiatedSerializer: legacyscheme.Codecs}}
config.ExtraConfig.KubeletClientConfig = kubeletclient.KubeletClientConfig{Port: 10250} config.ExtraConfig.KubeletClientConfig = kubeletclient.KubeletClientConfig{Port: 10250}
config.ExtraConfig.ProxyTransport = utilnet.SetTransportDefaults(&http.Transport{ config.ExtraConfig.ProxyTransport = utilnet.SetTransportDefaults(&http.Transport{
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) { return nil, nil }, DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) { return nil, nil },
@ -363,6 +381,112 @@ func TestAPIVersionOfDiscoveryEndpoints(t *testing.T) {
} }
// This test doesn't cover the apiregistration and apiextensions group, as they are installed by other apiservers.
func TestStorageVersionHashes(t *testing.T) {
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StorageVersionHash, true)()
master, etcdserver, _, _ := newMaster(t)
defer etcdserver.Terminate(t)
server := httptest.NewServer(master.GenericAPIServer.Handler.GoRestfulContainer.ServeMux)
c := &restclient.Config{
Host: server.URL,
APIPath: "/api",
ContentConfig: restclient.ContentConfig{NegotiatedSerializer: legacyscheme.Codecs},
}
discover := discovery.NewDiscoveryClientForConfigOrDie(c)
all, err := discover.ServerResources()
if err != nil {
t.Error(err)
}
var count int
for _, g := range all {
for _, r := range g.APIResources {
if strings.Contains(r.Name, "/") ||
storageversionhashdata.NoStorageVersionHash.Has(g.GroupVersion+"/"+r.Name) {
if r.StorageVersionHash != "" {
t.Errorf("expect resource %s/%s to have empty storageVersionHash, got hash %q", g.GroupVersion, r.Name, r.StorageVersionHash)
}
continue
}
if r.StorageVersionHash == "" {
t.Errorf("expect the storageVersionHash of %s/%s to exist", g.GroupVersion, r.Name)
continue
}
// Uncomment the following line if you want to update storageversionhash/data.go
// fmt.Printf("\"%s/%s\": \"%s\",\n", g.GroupVersion, r.Name, r.StorageVersionHash)
expected := storageversionhashdata.GVRToStorageVersionHash[g.GroupVersion+"/"+r.Name]
if r.StorageVersionHash != expected {
t.Errorf("expect the storageVersionHash of %s/%s to be %q, got %q", g.GroupVersion, r.Name, expected, r.StorageVersionHash)
}
count++
}
}
if count != len(storageversionhashdata.GVRToStorageVersionHash) {
t.Errorf("please remove the redundant entries from GVRToStorageVersionHash")
}
}
func TestStorageVersionHashEqualities(t *testing.T) {
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StorageVersionHash, true)()
master, etcdserver, _, assert := newMaster(t)
defer etcdserver.Terminate(t)
server := httptest.NewServer(master.GenericAPIServer.Handler.GoRestfulContainer.ServeMux)
// Test 1: extensions/v1beta1/replicasets and apps/v1/replicasets have
// the same storage version hash.
resp, err := http.Get(server.URL + "/apis/extensions/v1beta1")
assert.Empty(err)
extList := metav1.APIResourceList{}
assert.NoError(decodeResponse(resp, &extList))
var extReplicasetHash, appsReplicasetHash string
for _, r := range extList.APIResources {
if r.Name == "replicasets" {
extReplicasetHash = r.StorageVersionHash
}
}
assert.NotEmpty(extReplicasetHash)
resp, err = http.Get(server.URL + "/apis/apps/v1")
assert.Empty(err)
appsList := metav1.APIResourceList{}
assert.NoError(decodeResponse(resp, &appsList))
for _, r := range appsList.APIResources {
if r.Name == "replicasets" {
appsReplicasetHash = r.StorageVersionHash
}
}
assert.Equal(extReplicasetHash, appsReplicasetHash)
// Test 2: batch/v1/jobs and batch/v1beta1/cronjobs have different
// storage version hashes.
resp, err = http.Get(server.URL + "/apis/batch/v1")
assert.Empty(err)
batchv1 := metav1.APIResourceList{}
assert.NoError(decodeResponse(resp, &batchv1))
var jobsHash string
for _, r := range batchv1.APIResources {
if r.Name == "jobs" {
jobsHash = r.StorageVersionHash
}
}
assert.NotEmpty(jobsHash)
resp, err = http.Get(server.URL + "/apis/batch/v1beta1")
assert.Empty(err)
batchv1beta1 := metav1.APIResourceList{}
assert.NoError(decodeResponse(resp, &batchv1beta1))
var cronjobsHash string
for _, r := range batchv1beta1.APIResources {
if r.Name == "cronjobs" {
cronjobsHash = r.StorageVersionHash
}
}
assert.NotEmpty(cronjobsHash)
assert.NotEqual(jobsHash, cronjobsHash)
}
func TestNoAlphaVersionsEnabledByDefault(t *testing.T) { func TestNoAlphaVersionsEnabledByDefault(t *testing.T) {
config := DefaultAPIResourceConfigSource() config := DefaultAPIResourceConfigSource()
for gv, enable := range config.GroupVersionConfigs { for gv, enable := range config.GroupVersionConfigs {

View File

@ -0,0 +1,23 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["data.go"],
importpath = "k8s.io/kubernetes/pkg/master/storageversionhashdata",
visibility = ["//visibility:public"],
deps = ["//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library"],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,4 @@
approvers:
- api-approvers
reviewers:
- api-reviewers

View File

@ -0,0 +1,111 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package storageversionhashdata is for test only.
package storageversionhashdata
import (
"k8s.io/apimachinery/pkg/util/sets"
)
// NoStorageVersionHash lists resources that legitimately with empty storage
// version hash.
var NoStorageVersionHash = sets.NewString(
"v1/bindings",
"v1/componentstatuses",
"authentication.k8s.io/v1/tokenreviews",
"authorization.k8s.io/v1/localsubjectaccessreviews",
"authorization.k8s.io/v1/selfsubjectaccessreviews",
"authorization.k8s.io/v1/selfsubjectrulesreviews",
"authorization.k8s.io/v1/subjectaccessreviews",
"authentication.k8s.io/v1beta1/tokenreviews",
"authorization.k8s.io/v1beta1/localsubjectaccessreviews",
"authorization.k8s.io/v1beta1/selfsubjectaccessreviews",
"authorization.k8s.io/v1beta1/selfsubjectrulesreviews",
"authorization.k8s.io/v1beta1/subjectaccessreviews",
"extensions/v1beta1/replicationcontrollers",
)
// GVRToStorageVersionHash shouldn't change unless we intentionally change the
// storage version of a resource.
var GVRToStorageVersionHash = map[string]string{
"v1/configmaps": "qFsyl6wFWjQ=",
"v1/endpoints": "fWeeMqaN/OA=",
"v1/events": "r2yiGXH7wu8=",
"v1/limitranges": "EBKMFVe6cwo=",
"v1/namespaces": "Q3oi5N2YM8M=",
"v1/nodes": "XwShjMxG9Fs=",
"v1/persistentvolumeclaims": "QWTyNDq0dC4=",
"v1/persistentvolumes": "HN/zwEC+JgM=",
"v1/pods": "xPOwRZ+Yhw8=",
"v1/podtemplates": "LIXB2x4IFpk=",
"v1/replicationcontrollers": "Jond2If31h0=",
"v1/resourcequotas": "8uhSgffRX6w=",
"v1/secrets": "S6u1pOWzb84=",
"v1/serviceaccounts": "pbx9ZvyFpBE=",
"v1/services": "0/CO1lhkEBI=",
"autoscaling/v1/horizontalpodautoscalers": "oQlkt7f5j/A=",
"autoscaling/v2beta1/horizontalpodautoscalers": "oQlkt7f5j/A=",
"autoscaling/v2beta2/horizontalpodautoscalers": "oQlkt7f5j/A=",
"batch/v1/jobs": "mudhfqk/qZY=",
"batch/v1beta1/cronjobs": "h/JlFAZkyyY=",
"certificates.k8s.io/v1beta1/certificatesigningrequests": "UQh3YTCDIf0=",
"coordination.k8s.io/v1beta1/leases": "/sY7hl8ol1U=",
"coordination.k8s.io/v1/leases": "/sY7hl8ol1U=",
"extensions/v1beta1/daemonsets": "dd7pWHUlMKQ=",
"extensions/v1beta1/deployments": "8aSe+NMegvE=",
"extensions/v1beta1/ingresses": "Ejja63IbU0E=",
"extensions/v1beta1/networkpolicies": "YpfwF18m1G8=",
"extensions/v1beta1/podsecuritypolicies": "khBLobUXkqA=",
"extensions/v1beta1/replicasets": "P1RzHs8/mWQ=",
"networking.k8s.io/v1/networkpolicies": "YpfwF18m1G8=",
"networking.k8s.io/v1beta1/ingresses": "Ejja63IbU0E=",
"node.k8s.io/v1beta1/runtimeclasses": "8nMHWqj34s0=",
"policy/v1beta1/poddisruptionbudgets": "6BGBu0kpHtk=",
"policy/v1beta1/podsecuritypolicies": "khBLobUXkqA=",
"rbac.authorization.k8s.io/v1/clusterrolebindings": "48tpQ8gZHFc=",
"rbac.authorization.k8s.io/v1/clusterroles": "bYE5ZWDrJ44=",
"rbac.authorization.k8s.io/v1/rolebindings": "eGsCzGH6b1g=",
"rbac.authorization.k8s.io/v1/roles": "7FuwZcIIItM=",
"rbac.authorization.k8s.io/v1beta1/clusterrolebindings": "48tpQ8gZHFc=",
"rbac.authorization.k8s.io/v1beta1/clusterroles": "bYE5ZWDrJ44=",
"rbac.authorization.k8s.io/v1beta1/rolebindings": "eGsCzGH6b1g=",
"rbac.authorization.k8s.io/v1beta1/roles": "7FuwZcIIItM=",
"scheduling.k8s.io/v1beta1/priorityclasses": "D3vHs+OgrtA=",
"scheduling.k8s.io/v1/priorityclasses": "D3vHs+OgrtA=",
"storage.k8s.io/v1/storageclasses": "K+m6uJwbjGY=",
"storage.k8s.io/v1/volumeattachments": "vQAqD28V4AY=",
"storage.k8s.io/v1beta1/csidrivers": "hL6j/rwBV5w=",
"storage.k8s.io/v1beta1/csinodes": "Pe62DkZtjuo=",
"storage.k8s.io/v1beta1/storageclasses": "K+m6uJwbjGY=",
"storage.k8s.io/v1beta1/volumeattachments": "vQAqD28V4AY=",
"apps/v1/controllerrevisions": "85nkx63pcBU=",
"apps/v1/daemonsets": "dd7pWHUlMKQ=",
"apps/v1/deployments": "8aSe+NMegvE=",
"apps/v1/replicasets": "P1RzHs8/mWQ=",
"apps/v1/statefulsets": "H+vl74LkKdo=",
"apps/v1beta2/controllerrevisions": "85nkx63pcBU=",
"apps/v1beta2/daemonsets": "dd7pWHUlMKQ=",
"apps/v1beta2/deployments": "8aSe+NMegvE=",
"apps/v1beta2/replicasets": "P1RzHs8/mWQ=",
"apps/v1beta2/statefulsets": "H+vl74LkKdo=",
"apps/v1beta1/controllerrevisions": "85nkx63pcBU=",
"apps/v1beta1/deployments": "8aSe+NMegvE=",
"apps/v1beta1/statefulsets": "H+vl74LkKdo=",
"admissionregistration.k8s.io/v1beta1/mutatingwebhookconfigurations": "yxW1cpLtfp8=",
"admissionregistration.k8s.io/v1beta1/validatingwebhookconfigurations": "P9NhrezfnWE=",
"events.k8s.io/v1beta1/events": "r2yiGXH7wu8=",
}

View File

@ -226,6 +226,12 @@ func (r *REST) ShortNames() []string {
return []string{"ns"} return []string{"ns"}
} }
var _ rest.StorageVersionProvider = &REST{}
func (r *REST) StorageVersion() runtime.GroupVersioner {
return r.store.StorageVersion()
}
func (r *StatusREST) New() runtime.Object { func (r *StatusREST) New() runtime.Object {
return r.store.New() return r.store.New()
} }

View File

@ -77,6 +77,7 @@ type ServiceStorage interface {
rest.Watcher rest.Watcher
rest.TableConvertor rest.TableConvertor
rest.Exporter rest.Exporter
rest.StorageVersionProvider
} }
type EndpointsStorage interface { type EndpointsStorage interface {
@ -108,11 +109,16 @@ func NewREST(
} }
var ( var (
_ ServiceStorage = &REST{} _ ServiceStorage = &REST{}
_ rest.CategoriesProvider = &REST{} _ rest.CategoriesProvider = &REST{}
_ rest.ShortNamesProvider = &REST{} _ rest.ShortNamesProvider = &REST{}
_ rest.StorageVersionProvider = &REST{}
) )
func (rs *REST) StorageVersion() runtime.GroupVersioner {
return rs.services.StorageVersion()
}
// ShortNames implements the ShortNamesProvider interface. Returns a list of short names for a resource. // ShortNames implements the ShortNamesProvider interface. Returns a list of short names for a resource.
func (rs *REST) ShortNames() []string { func (rs *REST) ShortNames() []string {
return []string{"svc"} return []string{"svc"}

View File

@ -159,6 +159,10 @@ func (s *serviceStorage) Export(ctx context.Context, name string, opts metav1.Ex
panic("not implemented") panic("not implemented")
} }
func (s *serviceStorage) StorageVersion() runtime.GroupVersioner {
panic("not implemented")
}
func generateRandomNodePort() int32 { func generateRandomNodePort() int32 {
return int32(rand.IntnRange(30001, 30999)) return int32(rand.IntnRange(30001, 30999))
} }

View File

@ -50,6 +50,16 @@ func (r *Storage) NamespaceScoped() bool {
return false return false
} }
func (r *Storage) StorageVersion() runtime.GroupVersioner {
svp, ok := r.StandardStorage.(rest.StorageVersionProvider)
if !ok {
return nil
}
return svp.StorageVersion()
}
var _ rest.StorageVersionProvider = &Storage{}
var fullAuthority = []rbac.PolicyRule{ var fullAuthority = []rbac.PolicyRule{
rbac.NewRule("*").Groups("*").Resources("*").RuleOrDie(), rbac.NewRule("*").Groups("*").Resources("*").RuleOrDie(),
rbac.NewRule("*").URLs("*").RuleOrDie(), rbac.NewRule("*").URLs("*").RuleOrDie(),

View File

@ -51,6 +51,16 @@ func (r *Storage) NamespaceScoped() bool {
return false return false
} }
func (r *Storage) StorageVersion() runtime.GroupVersioner {
svp, ok := r.StandardStorage.(rest.StorageVersionProvider)
if !ok {
return nil
}
return svp.StorageVersion()
}
var _ rest.StorageVersionProvider = &Storage{}
func (s *Storage) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) { func (s *Storage) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) {
if rbacregistry.EscalationAllowed(ctx) { if rbacregistry.EscalationAllowed(ctx) {
return s.StandardStorage.Create(ctx, obj, createValidation, options) return s.StandardStorage.Create(ctx, obj, createValidation, options)

View File

@ -49,6 +49,16 @@ func (r *Storage) NamespaceScoped() bool {
return true return true
} }
func (r *Storage) StorageVersion() runtime.GroupVersioner {
svp, ok := r.StandardStorage.(rest.StorageVersionProvider)
if !ok {
return nil
}
return svp.StorageVersion()
}
var _ rest.StorageVersionProvider = &Storage{}
func (s *Storage) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) { func (s *Storage) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) {
if rbacregistry.EscalationAllowed(ctx) || rbacregistry.RoleEscalationAuthorized(ctx, s.authorizer) { if rbacregistry.EscalationAllowed(ctx) || rbacregistry.RoleEscalationAuthorized(ctx, s.authorizer) {
return s.StandardStorage.Create(ctx, obj, createValidation, options) return s.StandardStorage.Create(ctx, obj, createValidation, options)

View File

@ -52,6 +52,16 @@ func (r *Storage) NamespaceScoped() bool {
return true return true
} }
func (r *Storage) StorageVersion() runtime.GroupVersioner {
svp, ok := r.StandardStorage.(rest.StorageVersionProvider)
if !ok {
return nil
}
return svp.StorageVersion()
}
var _ rest.StorageVersionProvider = &Storage{}
func (s *Storage) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) { func (s *Storage) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) {
if rbacregistry.EscalationAllowed(ctx) { if rbacregistry.EscalationAllowed(ctx) {
return s.StandardStorage.Create(ctx, obj, createValidation, options) return s.StandardStorage.Create(ctx, obj, createValidation, options)

View File

@ -9,7 +9,9 @@ go_library(
"exec_mount_unsupported.go", "exec_mount_unsupported.go",
"fake.go", "fake.go",
"mount.go", "mount.go",
"mount_helper.go", "mount_helper_common.go",
"mount_helper_unix.go",
"mount_helper_windows.go",
"mount_linux.go", "mount_linux.go",
"mount_unsupported.go", "mount_unsupported.go",
"mount_windows.go", "mount_windows.go",

View File

@ -19,7 +19,6 @@ package mount
import ( import (
"fmt" "fmt"
"os" "os"
"syscall"
"k8s.io/klog" "k8s.io/klog"
) )
@ -102,23 +101,3 @@ func PathExists(path string) (bool, error) {
return false, err return false, err
} }
} }
// IsCorruptedMnt return true if err is about corrupted mount point
func IsCorruptedMnt(err error) bool {
if err == nil {
return false
}
var underlyingError error
switch pe := err.(type) {
case nil:
return false
case *os.PathError:
underlyingError = pe.Err
case *os.LinkError:
underlyingError = pe.Err
case *os.SyscallError:
underlyingError = pe.Err
}
return underlyingError == syscall.ENOTCONN || underlyingError == syscall.ESTALE || underlyingError == syscall.EIO || underlyingError == syscall.EACCES
}

View File

@ -0,0 +1,44 @@
// +build !windows
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package mount
import (
"os"
"syscall"
)
// IsCorruptedMnt return true if err is about corrupted mount point
func IsCorruptedMnt(err error) bool {
if err == nil {
return false
}
var underlyingError error
switch pe := err.(type) {
case nil:
return false
case *os.PathError:
underlyingError = pe.Err
case *os.LinkError:
underlyingError = pe.Err
case *os.SyscallError:
underlyingError = pe.Err
}
return underlyingError == syscall.ENOTCONN || underlyingError == syscall.ESTALE || underlyingError == syscall.EIO || underlyingError == syscall.EACCES
}

View File

@ -0,0 +1,68 @@
// +build windows
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package mount
import (
"os"
"syscall"
"k8s.io/klog"
)
// following failure codes are from https://docs.microsoft.com/en-us/windows/desktop/debug/system-error-codes--1300-1699-
// ERROR_BAD_NETPATH = 53
// ERROR_NETWORK_BUSY = 54
// ERROR_UNEXP_NET_ERR = 59
// ERROR_NETNAME_DELETED = 64
// ERROR_NETWORK_ACCESS_DENIED = 65
// ERROR_BAD_DEV_TYPE = 66
// ERROR_BAD_NET_NAME = 67
// ERROR_SESSION_CREDENTIAL_CONFLICT = 1219
// ERROR_LOGON_FAILURE = 1326
var errorNoList = [...]int{53, 54, 59, 64, 65, 66, 67, 1219, 1326}
// IsCorruptedMnt return true if err is about corrupted mount point
func IsCorruptedMnt(err error) bool {
if err == nil {
return false
}
var underlyingError error
switch pe := err.(type) {
case nil:
return false
case *os.PathError:
underlyingError = pe.Err
case *os.LinkError:
underlyingError = pe.Err
case *os.SyscallError:
underlyingError = pe.Err
}
if ee, ok := underlyingError.(syscall.Errno); ok {
for _, errno := range errorNoList {
if int(ee) == errno {
klog.Warningf("IsCorruptedMnt failed with error: %v, error code: %v", err, errno)
return true
}
}
}
return false
}

View File

@ -378,14 +378,15 @@ func getAllParentLinks(path string) ([]string, error) {
// GetMountRefs : empty implementation here since there is no place to query all mount points on Windows // GetMountRefs : empty implementation here since there is no place to query all mount points on Windows
func (mounter *Mounter) GetMountRefs(pathname string) ([]string, error) { func (mounter *Mounter) GetMountRefs(pathname string) ([]string, error) {
pathExists, pathErr := PathExists(normalizeWindowsPath(pathname)) windowsPath := normalizeWindowsPath(pathname)
// TODO(#75012): Need a Windows specific IsCorruptedMnt function that checks against whatever errno's pathExists, pathErr := PathExists(windowsPath)
// Windows emits when we try to Stat a corrupted mount
// https://golang.org/pkg/syscall/?GOOS=windows&GOARCH=amd64#Errno
if !pathExists { if !pathExists {
return []string{}, nil return []string{}, nil
} else if IsCorruptedMnt(pathErr) {
klog.Warningf("GetMountRefs found corrupted mount at %s, treating as unmounted path", windowsPath)
return []string{}, nil
} else if pathErr != nil { } else if pathErr != nil {
return nil, fmt.Errorf("error checking path %s: %v", normalizeWindowsPath(pathname), pathErr) return nil, fmt.Errorf("error checking path %s: %v", windowsPath, pathErr)
} }
return []string{pathname}, nil return []string{pathname}, nil
} }

View File

@ -37,8 +37,8 @@ import (
) )
type csiBlockMapper struct { type csiBlockMapper struct {
csiClientGetter
k8s kubernetes.Interface k8s kubernetes.Interface
csiClient csiClient
plugin *csiPlugin plugin *csiPlugin
driverName csiDriverName driverName csiDriverName
specName string specName string
@ -247,14 +247,20 @@ func (m *csiBlockMapper) SetUpDevice() (string, error) {
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout) ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
defer cancel() defer cancel()
csiClient, err := m.csiClientGetter.Get()
if err != nil {
klog.Error(log("blockMapper.SetUpDevice failed to get CSI client: %v", err))
return "", err
}
// Call NodeStageVolume // Call NodeStageVolume
stagingPath, err := m.stageVolumeForBlock(ctx, m.csiClient, accessMode, csiSource, attachment) stagingPath, err := m.stageVolumeForBlock(ctx, csiClient, accessMode, csiSource, attachment)
if err != nil { if err != nil {
return "", err return "", err
} }
// Call NodePublishVolume // Call NodePublishVolume
publishPath, err := m.publishVolumeForBlock(ctx, m.csiClient, accessMode, csiSource, attachment, stagingPath) publishPath, err := m.publishVolumeForBlock(ctx, csiClient, accessMode, csiSource, attachment, stagingPath)
if err != nil { if err != nil {
return "", err return "", err
} }
@ -326,6 +332,12 @@ func (m *csiBlockMapper) TearDownDevice(globalMapPath, devicePath string) error
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout) ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
defer cancel() defer cancel()
csiClient, err := m.csiClientGetter.Get()
if err != nil {
klog.Error(log("blockMapper.TearDownDevice failed to get CSI client: %v", err))
return err
}
// Call NodeUnpublishVolume // Call NodeUnpublishVolume
publishPath := m.getPublishPath() publishPath := m.getPublishPath()
if _, err := os.Stat(publishPath); err != nil { if _, err := os.Stat(publishPath); err != nil {
@ -335,7 +347,7 @@ func (m *csiBlockMapper) TearDownDevice(globalMapPath, devicePath string) error
return err return err
} }
} else { } else {
err := m.unpublishVolumeForBlock(ctx, m.csiClient, publishPath) err := m.unpublishVolumeForBlock(ctx, csiClient, publishPath)
if err != nil { if err != nil {
return err return err
} }
@ -350,7 +362,7 @@ func (m *csiBlockMapper) TearDownDevice(globalMapPath, devicePath string) error
return err return err
} }
} else { } else {
err := m.unstageVolumeForBlock(ctx, m.csiClient, stagingPath) err := m.unstageVolumeForBlock(ctx, csiClient, stagingPath)
if err != nil { if err != nil {
return err return err
} }

View File

@ -23,6 +23,7 @@ import (
"io" "io"
"net" "net"
"strings" "strings"
"sync"
"time" "time"
csipbv1 "github.com/container-storage-interface/spec/lib/go/csi" csipbv1 "github.com/container-storage-interface/spec/lib/go/csi"
@ -807,3 +808,36 @@ func versionRequiresV0Client(version *utilversion.Version) bool {
return false return false
} }
// CSI client getter with cache.
// This provides a method to initialize CSI client with driver name and caches
// it for later use. When CSI clients have not been discovered yet (e.g.
// on kubelet restart), client initialization will fail. Users of CSI client (e.g.
// mounter manager and block mapper) can use this to delay CSI client
// initialization until needed.
type csiClientGetter struct {
sync.RWMutex
csiClient csiClient
driverName csiDriverName
}
func (c *csiClientGetter) Get() (csiClient, error) {
c.RLock()
if c.csiClient != nil {
c.RUnlock()
return c.csiClient, nil
}
c.RUnlock()
c.Lock()
defer c.Unlock()
// Double-checking locking criterion.
if c.csiClient != nil {
return c.csiClient, nil
}
csi, err := newCsiDriverClient(c.driverName)
if err != nil {
return nil, err
}
c.csiClient = csi
return c.csiClient, nil
}

View File

@ -56,7 +56,7 @@ var (
) )
type csiMountMgr struct { type csiMountMgr struct {
csiClient csiClient csiClientGetter
k8s kubernetes.Interface k8s kubernetes.Interface
plugin *csiPlugin plugin *csiPlugin
driverName csiDriverName driverName csiDriverName
@ -111,7 +111,11 @@ func (c *csiMountMgr) SetUpAt(dir string, fsGroup *int64) error {
return nil return nil
} }
csi := c.csiClient csi, err := c.csiClientGetter.Get()
if err != nil {
klog.Error(log("mounter.SetUpAt failed to get CSI client: %v", err))
return err
}
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout) ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
defer cancel() defer cancel()
@ -343,7 +347,11 @@ func (c *csiMountMgr) TearDownAt(dir string) error {
klog.V(4).Infof(log("Unmounter.TearDown(%s)", dir)) klog.V(4).Infof(log("Unmounter.TearDown(%s)", dir))
volID := c.volumeID volID := c.volumeID
csi := c.csiClient csi, err := c.csiClientGetter.Get()
if err != nil {
klog.Error(log("mounter.SetUpAt failed to get CSI client: %v", err))
return err
}
ctx, cancel := context.WithTimeout(context.Background(), csiTimeout) ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
defer cancel() defer cancel()

View File

@ -383,11 +383,6 @@ func (p *csiPlugin) NewMounter(
return nil, errors.New("failed to get a Kubernetes client") return nil, errors.New("failed to get a Kubernetes client")
} }
csi, err := newCsiDriverClient(csiDriverName(driverName))
if err != nil {
return nil, err
}
mounter := &csiMountMgr{ mounter := &csiMountMgr{
plugin: p, plugin: p,
k8s: k8s, k8s: k8s,
@ -398,9 +393,9 @@ func (p *csiPlugin) NewMounter(
driverMode: driverMode, driverMode: driverMode,
volumeID: volumeHandle, volumeID: volumeHandle,
specVolumeID: spec.Name(), specVolumeID: spec.Name(),
csiClient: csi,
readOnly: readOnly, readOnly: readOnly,
} }
mounter.csiClientGetter.driverName = csiDriverName(driverName)
// Save volume info in pod dir // Save volume info in pod dir
dir := mounter.GetPath() dir := mounter.GetPath()
@ -458,10 +453,7 @@ func (p *csiPlugin) NewUnmounter(specName string, podUID types.UID) (volume.Unmo
} }
unmounter.driverName = csiDriverName(data[volDataKey.driverName]) unmounter.driverName = csiDriverName(data[volDataKey.driverName])
unmounter.volumeID = data[volDataKey.volHandle] unmounter.volumeID = data[volDataKey.volHandle]
unmounter.csiClient, err = newCsiDriverClient(unmounter.driverName) unmounter.csiClientGetter.driverName = unmounter.driverName
if err != nil {
return nil, err
}
return unmounter, nil return unmounter, nil
} }
@ -638,10 +630,6 @@ func (p *csiPlugin) NewBlockVolumeMapper(spec *volume.Spec, podRef *api.Pod, opt
} }
klog.V(4).Info(log("setting up block mapper for [volume=%v,driver=%v]", pvSource.VolumeHandle, pvSource.Driver)) klog.V(4).Info(log("setting up block mapper for [volume=%v,driver=%v]", pvSource.VolumeHandle, pvSource.Driver))
client, err := newCsiDriverClient(csiDriverName(pvSource.Driver))
if err != nil {
return nil, err
}
k8s := p.host.GetKubeClient() k8s := p.host.GetKubeClient()
if k8s == nil { if k8s == nil {
@ -650,7 +638,6 @@ func (p *csiPlugin) NewBlockVolumeMapper(spec *volume.Spec, podRef *api.Pod, opt
} }
mapper := &csiBlockMapper{ mapper := &csiBlockMapper{
csiClient: client,
k8s: k8s, k8s: k8s,
plugin: p, plugin: p,
volumeID: pvSource.VolumeHandle, volumeID: pvSource.VolumeHandle,
@ -660,6 +647,7 @@ func (p *csiPlugin) NewBlockVolumeMapper(spec *volume.Spec, podRef *api.Pod, opt
specName: spec.Name(), specName: spec.Name(),
podUID: podRef.UID, podUID: podRef.UID,
} }
mapper.csiClientGetter.driverName = csiDriverName(pvSource.Driver)
// Save volume info in pod dir // Save volume info in pod dir
dataDir := getVolumeDeviceDataDir(spec.Name(), p.host) dataDir := getVolumeDeviceDataDir(spec.Name(), p.host)
@ -714,7 +702,7 @@ func (p *csiPlugin) NewBlockVolumeUnmapper(volName string, podUID types.UID) (vo
} }
unmapper.driverName = csiDriverName(data[volDataKey.driverName]) unmapper.driverName = csiDriverName(data[volDataKey.driverName])
unmapper.volumeID = data[volDataKey.volHandle] unmapper.volumeID = data[volDataKey.volHandle]
unmapper.csiClient, err = newCsiDriverClient(unmapper.driverName) unmapper.csiClientGetter.driverName = unmapper.driverName
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -614,7 +614,8 @@ func TestPluginNewMounter(t *testing.T) {
if string(csiMounter.podUID) != string(test.podUID) { if string(csiMounter.podUID) != string(test.podUID) {
t.Error("mounter podUID not set") t.Error("mounter podUID not set")
} }
if csiMounter.csiClient == nil { csiClient, err := csiMounter.csiClientGetter.Get()
if csiClient == nil {
t.Error("mounter csiClient is nil") t.Error("mounter csiClient is nil")
} }
if csiMounter.driverMode != test.driverMode { if csiMounter.driverMode != test.driverMode {
@ -732,7 +733,8 @@ func TestPluginNewMounterWithInline(t *testing.T) {
if string(csiMounter.podUID) != string(test.podUID) { if string(csiMounter.podUID) != string(test.podUID) {
t.Error("mounter podUID not set") t.Error("mounter podUID not set")
} }
if csiMounter.csiClient == nil { csiClient, err := csiMounter.csiClientGetter.Get()
if csiClient == nil {
t.Error("mounter csiClient is nil") t.Error("mounter csiClient is nil")
} }
if csiMounter.driverMode != test.driverMode { if csiMounter.driverMode != test.driverMode {
@ -815,8 +817,9 @@ func TestPluginNewUnmounter(t *testing.T) {
t.Error("podUID not set") t.Error("podUID not set")
} }
if csiUnmounter.csiClient == nil { csiClient, err := csiUnmounter.csiClientGetter.Get()
t.Error("unmounter csiClient is nil") if csiClient == nil {
t.Error("mounter csiClient is nil")
} }
} }
@ -932,7 +935,8 @@ func TestPluginNewBlockMapper(t *testing.T) {
if csiMapper.podUID == types.UID("") { if csiMapper.podUID == types.UID("") {
t.Error("CSI block mapper missing pod.UID") t.Error("CSI block mapper missing pod.UID")
} }
if csiMapper.csiClient == nil { csiClient, err := csiMapper.csiClientGetter.Get()
if csiClient == nil {
t.Error("mapper csiClient is nil") t.Error("mapper csiClient is nil")
} }
@ -994,7 +998,8 @@ func TestPluginNewUnmapper(t *testing.T) {
t.Error("specName not set") t.Error("specName not set")
} }
if csiUnmapper.csiClient == nil { csiClient, err := csiUnmapper.csiClientGetter.Get()
if csiClient == nil {
t.Error("unmapper csiClient is nil") t.Error("unmapper csiClient is nil")
} }

View File

@ -254,6 +254,7 @@ func (mounter *quobyteMounter) SetUpAt(dir string, fsGroup *int64) error {
os.MkdirAll(dir, 0750) os.MkdirAll(dir, 0750)
var options []string var options []string
options = append(options, "allow-usermapping-in-volumename")
if mounter.readOnly { if mounter.readOnly {
options = append(options, "ro") options = append(options, "ro")
} }

View File

@ -95,6 +95,7 @@ func (c *DiscoveryController) sync(version schema.GroupVersion) error {
} }
foundThisVersion := false foundThisVersion := false
var storageVersionHash string
for _, v := range crd.Spec.Versions { for _, v := range crd.Spec.Versions {
if !v.Served { if !v.Served {
continue continue
@ -113,6 +114,9 @@ func (c *DiscoveryController) sync(version schema.GroupVersion) error {
if v.Name == version.Version { if v.Name == version.Version {
foundThisVersion = true foundThisVersion = true
} }
if v.Storage {
storageVersionHash = discovery.StorageVersionHash(gv.Group, gv.Version, crd.Spec.Names.Kind)
}
} }
if !foundThisVersion { if !foundThisVersion {
@ -127,13 +131,14 @@ func (c *DiscoveryController) sync(version schema.GroupVersion) error {
} }
apiResourcesForDiscovery = append(apiResourcesForDiscovery, metav1.APIResource{ apiResourcesForDiscovery = append(apiResourcesForDiscovery, metav1.APIResource{
Name: crd.Status.AcceptedNames.Plural, Name: crd.Status.AcceptedNames.Plural,
SingularName: crd.Status.AcceptedNames.Singular, SingularName: crd.Status.AcceptedNames.Singular,
Namespaced: crd.Spec.Scope == apiextensions.NamespaceScoped, Namespaced: crd.Spec.Scope == apiextensions.NamespaceScoped,
Kind: crd.Status.AcceptedNames.Kind, Kind: crd.Status.AcceptedNames.Kind,
Verbs: verbs, Verbs: verbs,
ShortNames: crd.Status.AcceptedNames.ShortNames, ShortNames: crd.Status.AcceptedNames.ShortNames,
Categories: crd.Status.AcceptedNames.Categories, Categories: crd.Status.AcceptedNames.Categories,
StorageVersionHash: storageVersionHash,
}) })
subresources, err := apiextensions.GetSubresourcesForVersion(crd, version.Version) subresources, err := apiextensions.GetSubresourcesForVersion(crd, version.Version)

View File

@ -32,6 +32,7 @@ go_library(
"group.go", "group.go",
"legacy.go", "legacy.go",
"root.go", "root.go",
"storageversionhash.go",
"util.go", "util.go",
"version.go", "version.go",
], ],

View File

@ -0,0 +1,40 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package discovery
import (
"crypto/sha256"
"encoding/base64"
)
// StorageVersionHash calculates the storage version hash for a
// <group/version/kind> tuple.
// WARNING: this function is subject to change. Clients shouldn't depend on
// this function.
func StorageVersionHash(group, version, kind string) string {
gvk := group + "/" + version + "/" + kind
if gvk == "" {
return ""
}
bytes := sha256.Sum256([]byte(gvk))
// Assuming there are N kinds in the cluster, and the hash is X-byte long,
// the chance of colliding hash P(N,X) approximates to 1-e^(-(N^2)/2^(8X+1)).
// P(10,000, 8) ~= 2.7*10^(-12), which is low enough.
// See https://en.wikipedia.org/wiki/Birthday_problem#Approximations.
return base64.StdEncoding.EncodeToString(
bytes[:8])
}

View File

@ -33,6 +33,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apiserver/pkg/admission" "k8s.io/apiserver/pkg/admission"
"k8s.io/apiserver/pkg/endpoints/discovery"
"k8s.io/apiserver/pkg/endpoints/handlers" "k8s.io/apiserver/pkg/endpoints/handlers"
"k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager" "k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager"
"k8s.io/apiserver/pkg/endpoints/handlers/negotiation" "k8s.io/apiserver/pkg/endpoints/handlers/negotiation"
@ -133,6 +134,20 @@ func (a *APIInstaller) newWebService() *restful.WebService {
return ws return ws
} }
// calculate the storage gvk, the gvk objects are converted to before persisted to the etcd.
func getStorageVersionKind(storageVersioner runtime.GroupVersioner, storage rest.Storage, typer runtime.ObjectTyper) (schema.GroupVersionKind, error) {
object := storage.New()
fqKinds, _, err := typer.ObjectKinds(object)
if err != nil {
return schema.GroupVersionKind{}, err
}
gvk, ok := storageVersioner.KindForGroupVersionKinds(fqKinds)
if !ok {
return schema.GroupVersionKind{}, fmt.Errorf("cannot find the storage version kind for %v", reflect.TypeOf(object))
}
return gvk, nil
}
// GetResourceKind returns the external group version kind registered for the given storage // GetResourceKind returns the external group version kind registered for the given storage
// object. If the storage object is a subresource and has an override supplied for it, it returns // object. If the storage object is a subresource and has an override supplied for it, it returns
// the group version kind supplied in the override. // the group version kind supplied in the override.
@ -227,6 +242,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
watcher, isWatcher := storage.(rest.Watcher) watcher, isWatcher := storage.(rest.Watcher)
connecter, isConnecter := storage.(rest.Connecter) connecter, isConnecter := storage.(rest.Connecter)
storageMeta, isMetadata := storage.(rest.StorageMetadata) storageMeta, isMetadata := storage.(rest.StorageMetadata)
storageVersionProvider, isStorageVersionProvider := storage.(rest.StorageVersionProvider)
if !isMetadata { if !isMetadata {
storageMeta = defaultStorageMetadata{} storageMeta = defaultStorageMetadata{}
} }
@ -365,6 +381,17 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
tableProvider, _ := storage.(rest.TableConvertor) tableProvider, _ := storage.(rest.TableConvertor)
var apiResource metav1.APIResource var apiResource metav1.APIResource
if utilfeature.DefaultFeatureGate.Enabled(features.StorageVersionHash) &&
isStorageVersionProvider &&
storageVersionProvider.StorageVersion() != nil {
versioner := storageVersionProvider.StorageVersion()
gvk, err := getStorageVersionKind(versioner, storage, a.group.Typer)
if err != nil {
return nil, err
}
apiResource.StorageVersionHash = discovery.StorageVersionHash(gvk.Group, gvk.Version, gvk.Kind)
}
// Get the list of actions for the given scope. // Get the list of actions for the given scope.
switch { switch {
case !namespaceScoped: case !namespaceScoped:

View File

@ -89,6 +89,13 @@ const (
// Server-side apply. Merging happens on the server. // Server-side apply. Merging happens on the server.
ServerSideApply utilfeature.Feature = "ServerSideApply" ServerSideApply utilfeature.Feature = "ServerSideApply"
// owner: @caesarxuchao
// alpha: v1.14
//
// Allow apiservers to expose the storage version hash in the discovery
// document.
StorageVersionHash utilfeature.Feature = "StorageVersionHash"
// owner: @ksubrmnn // owner: @ksubrmnn
// alpha: v1.14 // alpha: v1.14
// //
@ -118,6 +125,7 @@ var defaultKubernetesFeatureGates = map[utilfeature.Feature]utilfeature.FeatureS
APIListChunking: {Default: true, PreRelease: utilfeature.Beta}, APIListChunking: {Default: true, PreRelease: utilfeature.Beta},
DryRun: {Default: true, PreRelease: utilfeature.Beta}, DryRun: {Default: true, PreRelease: utilfeature.Beta},
ServerSideApply: {Default: false, PreRelease: utilfeature.Alpha}, ServerSideApply: {Default: false, PreRelease: utilfeature.Alpha},
StorageVersionHash: {Default: false, PreRelease: utilfeature.Alpha},
WinOverlay: {Default: false, PreRelease: utilfeature.Alpha}, WinOverlay: {Default: false, PreRelease: utilfeature.Alpha},
WinDSR: {Default: false, PreRelease: utilfeature.Alpha}, WinDSR: {Default: false, PreRelease: utilfeature.Alpha},
} }

View File

@ -177,6 +177,12 @@ type Store struct {
// resource. It is wrapped into a "DryRunnableStorage" that will // resource. It is wrapped into a "DryRunnableStorage" that will
// either pass-through or simply dry-run. // either pass-through or simply dry-run.
Storage DryRunnableStorage Storage DryRunnableStorage
// StorageVersioner outputs the <group/version/kind> an object will be
// converted to before persisted in etcd, given a list of possible
// kinds of the object.
// If the StorageVersioner is nil, apiserver will leave the
// storageVersionHash as empty in the discovery document.
StorageVersioner runtime.GroupVersioner
// Called to cleanup clients used by the underlying Storage; optional. // Called to cleanup clients used by the underlying Storage; optional.
DestroyFunc func() DestroyFunc func()
} }
@ -1287,6 +1293,7 @@ func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error {
attrFunc, attrFunc,
triggerFunc, triggerFunc,
) )
e.StorageVersioner = opts.StorageConfig.EncodeVersioner
if opts.CountMetricPollPeriod > 0 { if opts.CountMetricPollPeriod > 0 {
stopFunc := e.startObservingCount(opts.CountMetricPollPeriod) stopFunc := e.startObservingCount(opts.CountMetricPollPeriod)
@ -1327,3 +1334,7 @@ func (e *Store) ConvertToTable(ctx context.Context, object runtime.Object, table
} }
return rest.NewDefaultTableConvertor(e.qualifiedResourceFromContext(ctx)).ConvertToTable(ctx, object, tableOptions) return rest.NewDefaultTableConvertor(e.qualifiedResourceFromContext(ctx)).ConvertToTable(ctx, object, tableOptions)
} }
func (e *Store) StorageVersion() runtime.GroupVersioner {
return e.StorageVersioner
}

View File

@ -332,3 +332,12 @@ type StorageMetadata interface {
// it is not nil. Only the type of the return object matters, the value will be ignored. // it is not nil. Only the type of the return object matters, the value will be ignored.
ProducesObject(verb string) interface{} ProducesObject(verb string) interface{}
} }
// StorageVersionProvider is an optional interface that a storage object can
// implement if it wishes to disclose its storage version.
type StorageVersionProvider interface {
// StorageVersion returns a group versioner, which will outputs the gvk
// an object will be converted to before persisted in etcd, given a
// list of kinds the object might belong to.
StorageVersion() runtime.GroupVersioner
}

View File

@ -40,15 +40,15 @@ type StorageCodecConfig struct {
// NewStorageCodec assembles a storage codec for the provided storage media type, the provided serializer, and the requested // NewStorageCodec assembles a storage codec for the provided storage media type, the provided serializer, and the requested
// storage and memory versions. // storage and memory versions.
func NewStorageCodec(opts StorageCodecConfig) (runtime.Codec, error) { func NewStorageCodec(opts StorageCodecConfig) (runtime.Codec, runtime.GroupVersioner, error) {
mediaType, _, err := mime.ParseMediaType(opts.StorageMediaType) mediaType, _, err := mime.ParseMediaType(opts.StorageMediaType)
if err != nil { if err != nil {
return nil, fmt.Errorf("%q is not a valid mime-type", opts.StorageMediaType) return nil, nil, fmt.Errorf("%q is not a valid mime-type", opts.StorageMediaType)
} }
serializer, ok := runtime.SerializerInfoForMediaType(opts.StorageSerializer.SupportedMediaTypes(), mediaType) serializer, ok := runtime.SerializerInfoForMediaType(opts.StorageSerializer.SupportedMediaTypes(), mediaType)
if !ok { if !ok {
return nil, fmt.Errorf("unable to find serializer for %q", mediaType) return nil, nil, fmt.Errorf("unable to find serializer for %q", mediaType)
} }
s := serializer.Serializer s := serializer.Serializer
@ -74,14 +74,16 @@ func NewStorageCodec(opts StorageCodecConfig) (runtime.Codec, error) {
decoders = opts.DecoderDecoratorFn(decoders) decoders = opts.DecoderDecoratorFn(decoders)
} }
encodeVersioner := runtime.NewMultiGroupVersioner(
opts.StorageVersion,
schema.GroupKind{Group: opts.StorageVersion.Group},
schema.GroupKind{Group: opts.MemoryVersion.Group},
)
// Ensure the storage receives the correct version. // Ensure the storage receives the correct version.
encoder = opts.StorageSerializer.EncoderForVersion( encoder = opts.StorageSerializer.EncoderForVersion(
encoder, encoder,
runtime.NewMultiGroupVersioner( encodeVersioner,
opts.StorageVersion,
schema.GroupKind{Group: opts.StorageVersion.Group},
schema.GroupKind{Group: opts.MemoryVersion.Group},
),
) )
decoder := opts.StorageSerializer.DecoderToVersion( decoder := opts.StorageSerializer.DecoderToVersion(
recognizer.NewDecoder(decoders...), recognizer.NewDecoder(decoders...),
@ -92,5 +94,5 @@ func NewStorageCodec(opts StorageCodecConfig) (runtime.Codec, error) {
), ),
) )
return runtime.NewCodec(encoder, decoder), nil return runtime.NewCodec(encoder, decoder), encodeVersioner, nil
} }

View File

@ -86,7 +86,7 @@ type DefaultStorageFactory struct {
APIResourceConfigSource APIResourceConfigSource APIResourceConfigSource APIResourceConfigSource
// newStorageCodecFn exists to be overwritten for unit testing. // newStorageCodecFn exists to be overwritten for unit testing.
newStorageCodecFn func(opts StorageCodecConfig) (codec runtime.Codec, err error) newStorageCodecFn func(opts StorageCodecConfig) (codec runtime.Codec, encodeVersioner runtime.GroupVersioner, err error)
} }
type groupResourceOverrides struct { type groupResourceOverrides struct {
@ -278,7 +278,7 @@ func (s *DefaultStorageFactory) NewConfig(groupResource schema.GroupResource) (*
} }
codecConfig.Config = storageConfig codecConfig.Config = storageConfig
storageConfig.Codec, err = s.newStorageCodecFn(codecConfig) storageConfig.Codec, storageConfig.EncodeVersioner, err = s.newStorageCodecFn(codecConfig)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -57,6 +57,11 @@ type Config struct {
Paging bool Paging bool
Codec runtime.Codec Codec runtime.Codec
// EncodeVersioner is the same groupVersioner used to build the
// storage encoder. Given a list of kinds the input object might belong
// to, the EncodeVersioner outputs the gvk the object will be
// converted to before persisted in etcd.
EncodeVersioner runtime.GroupVersioner
// Transformer allows the value to be transformed prior to persisting into etcd. // Transformer allows the value to be transformed prior to persisting into etcd.
Transformer value.Transformer Transformer value.Transformer

View File

@ -184,13 +184,17 @@ func (rw *RetryWatcher) doReceive() (bool, time.Duration) {
continue continue
case watch.Error: case watch.Error:
status, ok := event.Object.(*metav1.Status) // This round trip allows us to handle unstructured status
errObject := apierrors.FromObject(event.Object)
statusErr, ok := errObject.(*apierrors.StatusError)
if !ok { if !ok {
klog.Error(spew.Sprintf("Received an error which is not *metav1.Status but %#+v", event.Object)) klog.Error(spew.Sprintf("Received an error which is not *metav1.Status but %#+v", event.Object))
// Retry unknown errors // Retry unknown errors
return false, 0 return false, 0
} }
status := statusErr.ErrStatus
statusDelay := time.Duration(0) statusDelay := time.Duration(0)
if status.Details != nil { if status.Details != nil {
statusDelay = time.Duration(status.Details.RetryAfterSeconds) * time.Second statusDelay = time.Duration(status.Details.RetryAfterSeconds) * time.Second

View File

@ -11,10 +11,13 @@ go_library(
importmap = "k8s.io/kubernetes/vendor/k8s.io/sample-apiserver/pkg/cmd/server", importmap = "k8s.io/kubernetes/vendor/k8s.io/sample-apiserver/pkg/cmd/server",
importpath = "k8s.io/sample-apiserver/pkg/cmd/server", importpath = "k8s.io/sample-apiserver/pkg/cmd/server",
deps = [ deps = [
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/admission:go_default_library", "//staging/src/k8s.io/apiserver/pkg/admission:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server:go_default_library", "//staging/src/k8s.io/apiserver/pkg/server:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/server/options:go_default_library", "//staging/src/k8s.io/apiserver/pkg/server/options:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/sample-apiserver/pkg/admission/plugin/banflunder:go_default_library", "//staging/src/k8s.io/sample-apiserver/pkg/admission/plugin/banflunder:go_default_library",
"//staging/src/k8s.io/sample-apiserver/pkg/admission/wardleinitializer:go_default_library", "//staging/src/k8s.io/sample-apiserver/pkg/admission/wardleinitializer:go_default_library",
"//staging/src/k8s.io/sample-apiserver/pkg/apis/wardle/v1alpha1:go_default_library", "//staging/src/k8s.io/sample-apiserver/pkg/apis/wardle/v1alpha1:go_default_library",

View File

@ -23,10 +23,13 @@ import (
"github.com/spf13/cobra" "github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
utilerrors "k8s.io/apimachinery/pkg/util/errors" utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apiserver/pkg/admission" "k8s.io/apiserver/pkg/admission"
genericapiserver "k8s.io/apiserver/pkg/server" genericapiserver "k8s.io/apiserver/pkg/server"
genericoptions "k8s.io/apiserver/pkg/server/options" genericoptions "k8s.io/apiserver/pkg/server/options"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/sample-apiserver/pkg/admission/plugin/banflunder" "k8s.io/sample-apiserver/pkg/admission/plugin/banflunder"
"k8s.io/sample-apiserver/pkg/admission/wardleinitializer" "k8s.io/sample-apiserver/pkg/admission/wardleinitializer"
"k8s.io/sample-apiserver/pkg/apis/wardle/v1alpha1" "k8s.io/sample-apiserver/pkg/apis/wardle/v1alpha1"
@ -56,7 +59,7 @@ func NewWardleServerOptions(out, errOut io.Writer) *WardleServerOptions {
StdOut: out, StdOut: out,
StdErr: errOut, StdErr: errOut,
} }
o.RecommendedOptions.Etcd.StorageConfig.EncodeVersioner = runtime.NewMultiGroupVersioner(v1alpha1.SchemeGroupVersion, schema.GroupKind{Group: v1alpha1.GroupName})
return o return o
} }
@ -83,6 +86,7 @@ func NewCommandStartWardleServer(defaults *WardleServerOptions, stopCh <-chan st
flags := cmd.Flags() flags := cmd.Flags()
o.RecommendedOptions.AddFlags(flags) o.RecommendedOptions.AddFlags(flags)
utilfeature.DefaultMutableFeatureGate.AddFlag(flags)
return cmd return cmd
} }

View File

@ -15,6 +15,7 @@ go_library(
"crd_publish_openapi.go", "crd_publish_openapi.go",
"crd_watch.go", "crd_watch.go",
"custom_resource_definition.go", "custom_resource_definition.go",
"discovery.go",
"etcd_failure.go", "etcd_failure.go",
"framework.go", "framework.go",
"garbage_collector.go", "garbage_collector.go",
@ -65,6 +66,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/yaml:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/yaml:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/endpoints/discovery:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/names:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage/names:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library",
"//staging/src/k8s.io/client-go/discovery:go_default_library", "//staging/src/k8s.io/client-go/discovery:go_default_library",

View File

@ -0,0 +1,78 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package apimachinery
import (
utilversion "k8s.io/apimachinery/pkg/util/version"
"k8s.io/apiserver/pkg/endpoints/discovery"
"k8s.io/kubernetes/test/e2e/framework"
. "github.com/onsi/ginkgo"
)
var storageVersionServerVersion = utilversion.MustParseSemantic("v1.13.99")
var _ = SIGDescribe("Discovery", func() {
f := framework.NewDefaultFramework("discovery")
var namespaceName string
BeforeEach(func() {
namespaceName = f.Namespace.Name
framework.SkipUnlessServerVersionGTE(storageVersionServerVersion, f.ClientSet.Discovery())
By("Setting up server cert")
setupServerCert(namespaceName, serviceName)
})
It("[Feature:StorageVersionHash] Custom resource should have storage version hash", func() {
testcrd, err := framework.CreateTestCRD(f)
if err != nil {
return
}
defer testcrd.CleanUp()
spec := testcrd.Crd.Spec
resources, err := testcrd.ApiExtensionClient.Discovery().ServerResourcesForGroupVersion(spec.Group + "/" + spec.Versions[0].Name)
if err != nil {
framework.Failf("failed to find the discovery doc for %v: %v", resources, err)
}
found := false
var storageVersion string
for _, v := range spec.Versions {
if v.Storage {
storageVersion = v.Name
}
}
// DISCLAIMER: the algorithm of deriving the storageVersionHash
// is an implementation detail, which shouldn't be relied on by
// the clients. The following calculation is for test purpose
// only.
expected := discovery.StorageVersionHash(spec.Group, storageVersion, spec.Names.Kind)
for _, r := range resources.APIResources {
if r.Name == spec.Names.Plural {
found = true
if r.StorageVersionHash != expected {
framework.Failf("expected storageVersionHash of %s/%s/%s to be %s, got %s", r.Group, r.Version, r.Name, expected, r.StorageVersionHash)
}
}
}
if !found {
framework.Failf("didn't find resource %s in the discovery doc", spec.Names.Plural)
}
})
})

View File

@ -101,20 +101,37 @@ var _ = framework.KubeDescribe("NodeLease", func() {
By("verify NodeStatus report period is longer than lease duration") By("verify NodeStatus report period is longer than lease duration")
// NodeStatus is reported from node to master when there is some change or // NodeStatus is reported from node to master when there is some change or
// enough time has passed. So for here, keep checking the time diff // enough time has passed. So for here, keep checking the time diff
// between 2 NodeStatus report, until it is longer than lease duration ( // between 2 NodeStatus report, until it is longer than lease duration
// the same as nodeMonitorGracePeriod). // (the same as nodeMonitorGracePeriod), or it doesn't change for at least leaseDuration
heartbeatTime := getNextReadyConditionHeartbeatTime(f.ClientSet, nodeName, metav1.Time{}) lastHeartbeatTime := getReadyConditionHeartbeatTime(f.ClientSet, nodeName)
lastObserved := time.Now()
Eventually(func() error { Eventually(func() error {
nextHeartbeatTime := getNextReadyConditionHeartbeatTime(f.ClientSet, nodeName, heartbeatTime) currentHeartbeatTime := getReadyConditionHeartbeatTime(f.ClientSet, nodeName)
currentObserved := time.Now()
if nextHeartbeatTime.Time.After(heartbeatTime.Time.Add(leaseDuration)) { switch {
return nil case currentHeartbeatTime == lastHeartbeatTime:
if currentObserved.Sub(lastObserved) > 2*leaseDuration {
// heartbeat hasn't changed while watching for at least 2*leaseDuration, success!
framework.Logf("node status heartbeat is unchanged for %s, was waiting for at least %s, success!", currentObserved.Sub(lastObserved), 2*leaseDuration)
return nil
}
framework.Logf("node status heartbeat is unchanged for %s, waiting for %s", currentObserved.Sub(lastObserved), 2*leaseDuration)
return fmt.Errorf("node status heartbeat is unchanged for %s, waiting for %s", currentObserved.Sub(lastObserved), 2*leaseDuration)
case currentHeartbeatTime != lastHeartbeatTime:
if currentHeartbeatTime.Sub(lastHeartbeatTime) > leaseDuration {
// heartbeat time changed, but the diff was greater than leaseDuration, success!
framework.Logf("node status heartbeat changed in %s, was waiting for at least %s, success!", currentHeartbeatTime.Sub(lastHeartbeatTime), leaseDuration)
return nil
}
lastHeartbeatTime = currentHeartbeatTime
lastObserved = currentObserved
framework.Logf("node status heartbeat changed in %s, waiting for %s", currentHeartbeatTime.Sub(lastHeartbeatTime), leaseDuration)
return fmt.Errorf("node status heartbeat changed in %s, waiting for %s", currentHeartbeatTime.Sub(lastHeartbeatTime), leaseDuration)
} }
heartbeatTime = nextHeartbeatTime return nil
return fmt.Errorf("node status report period is shorter than lease duration") }, 5*time.Minute, time.Second).Should(BeNil())
// Enter next round immediately.
}, 5*time.Minute, time.Nanosecond).Should(BeNil())
By("verify node is still in ready status even though node status report is infrequent") By("verify node is still in ready status even though node status report is infrequent")
// This check on node status is only meaningful when this e2e test is // This check on node status is only meaningful when this e2e test is
@ -128,22 +145,12 @@ var _ = framework.KubeDescribe("NodeLease", func() {
}) })
}) })
func getNextReadyConditionHeartbeatTime(clientSet clientset.Interface, nodeName string, prevHeartbeatTime metav1.Time) metav1.Time { func getReadyConditionHeartbeatTime(clientSet clientset.Interface, nodeName string) time.Time {
var newHeartbeatTime metav1.Time node, err := clientSet.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{})
Eventually(func() error { Expect(err).To(BeNil())
node, err := clientSet.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{}) _, readyCondition := testutils.GetNodeCondition(&node.Status, corev1.NodeReady)
if err != nil { Expect(readyCondition.Status).To(Equal(corev1.ConditionTrue))
return err return readyCondition.LastHeartbeatTime.Time
}
_, readyCondition := testutils.GetNodeCondition(&node.Status, corev1.NodeReady)
Expect(readyCondition.Status).To(Equal(corev1.ConditionTrue))
newHeartbeatTime = readyCondition.LastHeartbeatTime
if prevHeartbeatTime.Before(&newHeartbeatTime) {
return nil
}
return fmt.Errorf("heartbeat has not changed yet")
}, 5*time.Minute, 5*time.Second).Should(BeNil())
return newHeartbeatTime
} }
func expectLease(lease *coordv1beta1.Lease, nodeName string) error { func expectLease(lease *coordv1beta1.Lease, nodeName string) error {

View File

@ -12,6 +12,6 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
FROM scratch FROM gcr.io/distroless/static:latest
COPY audit-proxy / COPY audit-proxy /
ENTRYPOINT ["/audit-proxy"] ENTRYPOINT ["/audit-proxy"]

View File

@ -1 +1 @@
1.0 1.1