k3s/test/integration/etcd/etcd_storage_path_test.go

934 lines
37 KiB
Go
Raw Normal View History

// +build integration,!no-etcd
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd
import (
"context"
"encoding/json"
"fmt"
"mime"
"net"
"net/http"
"reflect"
"strings"
"sync/atomic"
"testing"
"time"
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/diff"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/storage/storagebackend"
kclient "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/kubernetes/cmd/kube-apiserver/app"
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
kapi "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/pkg/kubectl/cmd/util"
"k8s.io/kubernetes/test/integration/framework"
// install all APIs
_ "k8s.io/kubernetes/pkg/master" // TODO what else is needed
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/pkg/transport"
)
// Etcd data for all persisted objects.
var etcdStorageData = map[schema.GroupVersionResource]struct {
stub string // Valid JSON stub to use during create
prerequisites []prerequisite // Optional, ordered list of JSON objects to create before stub
expectedEtcdPath string // Expected location of object in etcd, do not use any variables, constants, etc to derive this value - always supply the full raw string
expectedGVK *schema.GroupVersionKind // The GVK that we expect this object to be stored as - leave this nil to use the default
}{
// k8s.io/kubernetes/pkg/api/v1
gvr("", "v1", "configmaps"): {
stub: `{"data": {"foo": "bar"}, "metadata": {"name": "cm1"}}`,
expectedEtcdPath: "/registry/configmaps/etcdstoragepathtestnamespace/cm1",
},
gvr("", "v1", "services"): {
stub: `{"metadata": {"name": "service1"}, "spec": {"externalName": "service1name", "ports": [{"port": 10000, "targetPort": 11000}], "selector": {"test": "data"}}}`,
expectedEtcdPath: "/registry/services/specs/etcdstoragepathtestnamespace/service1",
},
gvr("", "v1", "podtemplates"): {
stub: `{"metadata": {"name": "pt1name"}, "template": {"metadata": {"labels": {"pt": "01"}}, "spec": {"containers": [{"image": "fedora:latest", "name": "container9"}]}}}`,
expectedEtcdPath: "/registry/podtemplates/etcdstoragepathtestnamespace/pt1name",
},
gvr("", "v1", "pods"): {
stub: `{"metadata": {"name": "pod1"}, "spec": {"containers": [{"image": "fedora:latest", "name": "container7", "resources": {"limits": {"cpu": "1M"}, "requests": {"cpu": "1M"}}}]}}`,
expectedEtcdPath: "/registry/pods/etcdstoragepathtestnamespace/pod1",
},
gvr("", "v1", "endpoints"): {
stub: `{"metadata": {"name": "ep1name"}, "subsets": [{"addresses": [{"hostname": "bar-001", "ip": "192.168.3.1"}], "ports": [{"port": 8000}]}]}`,
expectedEtcdPath: "/registry/services/endpoints/etcdstoragepathtestnamespace/ep1name",
},
gvr("", "v1", "resourcequotas"): {
stub: `{"metadata": {"name": "rq1name"}, "spec": {"hard": {"cpu": "5M"}}}`,
expectedEtcdPath: "/registry/resourcequotas/etcdstoragepathtestnamespace/rq1name",
},
gvr("", "v1", "limitranges"): {
stub: `{"metadata": {"name": "lr1name"}, "spec": {"limits": [{"type": "Pod"}]}}`,
expectedEtcdPath: "/registry/limitranges/etcdstoragepathtestnamespace/lr1name",
},
gvr("", "v1", "namespaces"): {
stub: `{"metadata": {"name": "namespace1"}, "spec": {"finalizers": ["kubernetes"]}}`,
expectedEtcdPath: "/registry/namespaces/namespace1",
},
gvr("", "v1", "nodes"): {
stub: `{"metadata": {"name": "node1"}, "spec": {"unschedulable": true}}`,
expectedEtcdPath: "/registry/minions/node1",
},
gvr("", "v1", "persistentvolumes"): {
stub: `{"metadata": {"name": "pv1name"}, "spec": {"accessModes": ["ReadWriteOnce"], "capacity": {"storage": "3M"}, "hostPath": {"path": "/tmp/test/"}}}`,
expectedEtcdPath: "/registry/persistentvolumes/pv1name",
},
gvr("", "v1", "events"): {
stub: `{"involvedObject": {"namespace": "etcdstoragepathtestnamespace"}, "message": "some data here", "metadata": {"name": "event1"}}`,
expectedEtcdPath: "/registry/events/etcdstoragepathtestnamespace/event1",
},
gvr("", "v1", "persistentvolumeclaims"): {
stub: `{"metadata": {"name": "pvc1"}, "spec": {"accessModes": ["ReadWriteOnce"], "resources": {"limits": {"storage": "1M"}, "requests": {"storage": "2M"}}, "selector": {"matchLabels": {"pvc": "stuff"}}}}`,
expectedEtcdPath: "/registry/persistentvolumeclaims/etcdstoragepathtestnamespace/pvc1",
},
gvr("", "v1", "serviceaccounts"): {
stub: `{"metadata": {"name": "sa1name"}, "secrets": [{"name": "secret00"}]}`,
expectedEtcdPath: "/registry/serviceaccounts/etcdstoragepathtestnamespace/sa1name",
},
gvr("", "v1", "secrets"): {
stub: `{"data": {"key": "ZGF0YSBmaWxl"}, "metadata": {"name": "secret1"}}`,
expectedEtcdPath: "/registry/secrets/etcdstoragepathtestnamespace/secret1",
},
gvr("", "v1", "replicationcontrollers"): {
stub: `{"metadata": {"name": "rc1"}, "spec": {"selector": {"new": "stuff"}, "template": {"metadata": {"labels": {"new": "stuff"}}, "spec": {"containers": [{"image": "fedora:latest", "name": "container8"}]}}}}`,
expectedEtcdPath: "/registry/controllers/etcdstoragepathtestnamespace/rc1",
},
// --
// k8s.io/kubernetes/pkg/apis/apps/v1beta1
gvr("apps", "v1beta1", "statefulsets"): {
stub: `{"metadata": {"name": "ss1"}, "spec": {"template": {"metadata": {"labels": {"a": "b"}}}}}`,
expectedEtcdPath: "/registry/statefulsets/etcdstoragepathtestnamespace/ss1",
},
gvr("apps", "v1beta1", "deployments"): {
stub: `{"metadata": {"name": "deployment2"}, "spec": {"selector": {"matchLabels": {"f": "z"}}, "template": {"metadata": {"labels": {"f": "z"}}, "spec": {"containers": [{"image": "fedora:latest", "name": "container6"}]}}}}`,
expectedEtcdPath: "/registry/deployments/etcdstoragepathtestnamespace/deployment2",
expectedGVK: gvkP("extensions", "v1beta1", "Deployment"),
},
// --
// k8s.io/kubernetes/pkg/apis/autoscaling/v1
gvr("autoscaling", "v1", "horizontalpodautoscalers"): {
stub: `{"metadata": {"name": "hpa2"}, "spec": {"maxReplicas": 3, "scaleTargetRef": {"kind": "something", "name": "cross"}}}`,
expectedEtcdPath: "/registry/horizontalpodautoscalers/etcdstoragepathtestnamespace/hpa2",
},
// --
// k8s.io/kubernetes/pkg/apis/autoscaling/v2alpha1
gvr("autoscaling", "v2alpha1", "horizontalpodautoscalers"): {
stub: `{"metadata": {"name": "hpa1"}, "spec": {"maxReplicas": 3, "scaleTargetRef": {"kind": "something", "name": "cross"}}}`,
expectedEtcdPath: "/registry/horizontalpodautoscalers/etcdstoragepathtestnamespace/hpa1",
expectedGVK: gvkP("autoscaling", "v1", "HorizontalPodAutoscaler"),
},
// --
// k8s.io/kubernetes/pkg/apis/batch/v1
gvr("batch", "v1", "jobs"): {
stub: `{"metadata": {"name": "job1"}, "spec": {"manualSelector": true, "selector": {"matchLabels": {"controller-uid": "uid1"}}, "template": {"metadata": {"labels": {"controller-uid": "uid1"}}, "spec": {"containers": [{"image": "fedora:latest", "name": "container1"}], "dnsPolicy": "ClusterFirst", "restartPolicy": "Never"}}}}`,
expectedEtcdPath: "/registry/jobs/etcdstoragepathtestnamespace/job1",
},
// --
// k8s.io/kubernetes/pkg/apis/batch/v2alpha1
gvr("batch", "v2alpha1", "cronjobs"): {
stub: `{"metadata": {"name": "cj1"}, "spec": {"jobTemplate": {"spec": {"template": {"metadata": {"labels": {"controller-uid": "uid0"}}, "spec": {"containers": [{"image": "fedora:latest", "name": "container0"}], "dnsPolicy": "ClusterFirst", "restartPolicy": "Never"}}}}, "schedule": "* * * * *"}}`,
expectedEtcdPath: "/registry/cronjobs/etcdstoragepathtestnamespace/cj1",
},
gvr("batch", "v2alpha1", "scheduledjobs"): {
stub: `{"metadata": {"name": "cj2"}, "spec": {"jobTemplate": {"spec": {"template": {"metadata": {"labels": {"controller-uid": "uid0"}}, "spec": {"containers": [{"image": "fedora:latest", "name": "container0"}], "dnsPolicy": "ClusterFirst", "restartPolicy": "Never"}}}}, "schedule": "* * * * *"}}`,
expectedEtcdPath: "/registry/cronjobs/etcdstoragepathtestnamespace/cj2",
expectedGVK: gvkP("batch", "v2alpha1", "CronJob"), // scheduledjobs were deprecated by cronjobs
},
// --
// k8s.io/kubernetes/pkg/apis/certificates/v1beta1
gvr("certificates.k8s.io", "v1beta1", "certificatesigningrequests"): {
stub: `{"metadata": {"name": "csr1"}, "spec": {"request": "LS0tLS1CRUdJTiBDRVJUSUZJQ0FURSBSRVFVRVNULS0tLS0KTUlJQnlqQ0NBVE1DQVFBd2dZa3hDekFKQmdOVkJBWVRBbFZUTVJNd0VRWURWUVFJRXdwRFlXeHBabTl5Ym1saApNUll3RkFZRFZRUUhFdzFOYjNWdWRHRnBiaUJXYVdWM01STXdFUVlEVlFRS0V3cEhiMjluYkdVZ1NXNWpNUjh3CkhRWURWUVFMRXhaSmJtWnZjbTFoZEdsdmJpQlVaV05vYm05c2IyZDVNUmN3RlFZRFZRUURFdzUzZDNjdVoyOXYKWjJ4bExtTnZiVENCbnpBTkJna3Foa2lHOXcwQkFRRUZBQU9CalFBd2dZa0NnWUVBcFp0WUpDSEo0VnBWWEhmVgpJbHN0UVRsTzRxQzAzaGpYK1prUHl2ZFlkMVE0K3FiQWVUd1htQ1VLWUhUaFZSZDVhWFNxbFB6eUlCd2llTVpyCldGbFJRZGRaMUl6WEFsVlJEV3dBbzYwS2VjcWVBWG5uVUsrNWZYb1RJL1VnV3NocmU4dEoreC9UTUhhUUtSL0oKY0lXUGhxYVFoc0p1elpidkFkR0E4MEJMeGRNQ0F3RUFBYUFBTUEwR0NTcUdTSWIzRFFFQkJRVUFBNEdCQUlobAo0UHZGcStlN2lwQVJnSTVaTStHWng2bXBDejQ0RFRvMEprd2ZSRGYrQnRyc2FDMHE2OGVUZjJYaFlPc3E0ZmtIClEwdUEwYVZvZzNmNWlKeENhM0hwNWd4YkpRNnpWNmtKMFRFc3VhYU9oRWtvOXNkcENvUE9uUkJtMmkvWFJEMkQKNmlOaDhmOHowU2hHc0ZxakRnRkh5RjNvK2xVeWorVUM2SDFRVzdibgotLS0tLUVORCBDRVJUSUZJQ0FURSBSRVFVRVNULS0tLS0="}}`,
expectedEtcdPath: "/registry/certificatesigningrequests/csr1",
},
// --
// k8s.io/kubernetes/pkg/apis/extensions/v1beta1
gvr("extensions", "v1beta1", "daemonsets"): {
stub: `{"metadata": {"name": "ds1"}, "spec": {"selector": {"matchLabels": {"u": "t"}}, "template": {"metadata": {"labels": {"u": "t"}}, "spec": {"containers": [{"image": "fedora:latest", "name": "container5"}]}}}}`,
expectedEtcdPath: "/registry/daemonsets/etcdstoragepathtestnamespace/ds1",
},
gvr("extensions", "v1beta1", "podsecuritypolicies"): {
stub: `{"metadata": {"name": "psp1"}, "spec": {"fsGroup": {"rule": "RunAsAny"}, "privileged": true, "runAsUser": {"rule": "RunAsAny"}, "seLinux": {"rule": "MustRunAs"}, "supplementalGroups": {"rule": "RunAsAny"}}}`,
expectedEtcdPath: "/registry/podsecuritypolicy/psp1",
},
gvr("extensions", "v1beta1", "thirdpartyresources"): {
stub: `{"description": "third party", "metadata": {"name": "kind.domain.tld"}, "versions": [{"name": "v3"}]}`,
expectedEtcdPath: "/registry/thirdpartyresources/kind.domain.tld",
},
gvr("extensions", "v1beta1", "ingresses"): {
stub: `{"metadata": {"name": "ingress1"}, "spec": {"backend": {"serviceName": "service", "servicePort": 5000}}}`,
expectedEtcdPath: "/registry/ingress/etcdstoragepathtestnamespace/ingress1",
},
gvr("extensions", "v1beta1", "networkpolicies"): {
stub: `{"metadata": {"name": "np1"}, "spec": {"podSelector": {"matchLabels": {"e": "f"}}}}`,
expectedEtcdPath: "/registry/networkpolicies/etcdstoragepathtestnamespace/np1",
},
gvr("extensions", "v1beta1", "deployments"): {
stub: `{"metadata": {"name": "deployment1"}, "spec": {"selector": {"matchLabels": {"f": "z"}}, "template": {"metadata": {"labels": {"f": "z"}}, "spec": {"containers": [{"image": "fedora:latest", "name": "container6"}]}}}}`,
expectedEtcdPath: "/registry/deployments/etcdstoragepathtestnamespace/deployment1",
},
gvr("extensions", "v1beta1", "replicasets"): {
stub: `{"metadata": {"name": "rs1"}, "spec": {"selector": {"matchLabels": {"g": "h"}}, "template": {"metadata": {"labels": {"g": "h"}}, "spec": {"containers": [{"image": "fedora:latest", "name": "container4"}]}}}}`,
expectedEtcdPath: "/registry/replicasets/etcdstoragepathtestnamespace/rs1",
},
// --
// k8s.io/kubernetes/pkg/apis/policy/v1beta1
gvr("policy", "v1beta1", "poddisruptionbudgets"): {
stub: `{"metadata": {"name": "pdb1"}, "spec": {"selector": {"matchLabels": {"anokkey": "anokvalue"}}}}`,
expectedEtcdPath: "/registry/poddisruptionbudgets/etcdstoragepathtestnamespace/pdb1",
},
// --
// k8s.io/kubernetes/pkg/apis/storage/v1beta1
gvr("storage.k8s.io", "v1beta1", "storageclasses"): {
stub: `{"metadata": {"name": "sc1"}, "provisioner": "aws"}`,
expectedEtcdPath: "/registry/storageclasses/sc1",
},
// --
// k8s.io/kubernetes/pkg/apis/storage/v1
gvr("storage.k8s.io", "v1", "storageclasses"): {
stub: `{"metadata": {"name": "sc2"}, "provisioner": "aws"}`,
expectedEtcdPath: "/registry/storageclasses/sc2",
expectedGVK: gvkP("storage.k8s.io", "v1beta1", "StorageClass"),
},
// --
// k8s.io/kubernetes/pkg/apis/settings/v1alpha1
gvr("settings.k8s.io", "v1alpha1", "podpresets"): {
stub: `{"metadata": {"name": "podpre1"}, "spec": {"env": [{"name": "FOO"}]}}`,
expectedEtcdPath: "/registry/podpresets/etcdstoragepathtestnamespace/podpre1",
},
// --
// k8s.io/kubernetes/pkg/apis/rbac/v1alpha1
gvr("rbac.authorization.k8s.io", "v1alpha1", "roles"): {
stub: `{"metadata": {"name": "role1"}, "rules": [{"apiGroups": ["v1"], "resources": ["events"], "verbs": ["watch"]}]}`,
expectedEtcdPath: "/registry/roles/etcdstoragepathtestnamespace/role1",
expectedGVK: gvkP("rbac.authorization.k8s.io", "v1beta1", "Role"),
},
gvr("rbac.authorization.k8s.io", "v1alpha1", "clusterroles"): {
stub: `{"metadata": {"name": "crole1"}, "rules": [{"nonResourceURLs": ["/version"], "verbs": ["get"]}]}`,
expectedEtcdPath: "/registry/clusterroles/crole1",
expectedGVK: gvkP("rbac.authorization.k8s.io", "v1beta1", "ClusterRole"),
},
gvr("rbac.authorization.k8s.io", "v1alpha1", "rolebindings"): {
stub: `{"metadata": {"name": "roleb1"}, "roleRef": {"apiGroup": "rbac.authorization.k8s.io", "kind": "ClusterRole", "name": "somecr"}, "subjects": [{"apiVersion": "rbac.authorization.k8s.io/v1alpha1", "kind": "Group", "name": "system:authenticated"}]}`,
expectedEtcdPath: "/registry/rolebindings/etcdstoragepathtestnamespace/roleb1",
expectedGVK: gvkP("rbac.authorization.k8s.io", "v1beta1", "RoleBinding"),
},
gvr("rbac.authorization.k8s.io", "v1alpha1", "clusterrolebindings"): {
stub: `{"metadata": {"name": "croleb1"}, "roleRef": {"apiGroup": "rbac.authorization.k8s.io", "kind": "ClusterRole", "name": "somecr"}, "subjects": [{"apiVersion": "rbac.authorization.k8s.io/v1alpha1", "kind": "Group", "name": "system:authenticated"}]}`,
expectedEtcdPath: "/registry/clusterrolebindings/croleb1",
expectedGVK: gvkP("rbac.authorization.k8s.io", "v1beta1", "ClusterRoleBinding"),
},
// --
// k8s.io/kubernetes/pkg/apis/rbac/v1beta1
gvr("rbac.authorization.k8s.io", "v1beta1", "roles"): {
stub: `{"metadata": {"name": "role2"}, "rules": [{"apiGroups": ["v1"], "resources": ["events"], "verbs": ["watch"]}]}`,
expectedEtcdPath: "/registry/roles/etcdstoragepathtestnamespace/role2",
},
gvr("rbac.authorization.k8s.io", "v1beta1", "clusterroles"): {
stub: `{"metadata": {"name": "crole2"}, "rules": [{"nonResourceURLs": ["/version"], "verbs": ["get"]}]}`,
expectedEtcdPath: "/registry/clusterroles/crole2",
},
gvr("rbac.authorization.k8s.io", "v1beta1", "rolebindings"): {
stub: `{"metadata": {"name": "roleb2"}, "roleRef": {"apiGroup": "rbac.authorization.k8s.io", "kind": "ClusterRole", "name": "somecr"}, "subjects": [{"apiVersion": "rbac.authorization.k8s.io/v1alpha1", "kind": "Group", "name": "system:authenticated"}]}`,
expectedEtcdPath: "/registry/rolebindings/etcdstoragepathtestnamespace/roleb2",
},
gvr("rbac.authorization.k8s.io", "v1beta1", "clusterrolebindings"): {
stub: `{"metadata": {"name": "croleb2"}, "roleRef": {"apiGroup": "rbac.authorization.k8s.io", "kind": "ClusterRole", "name": "somecr"}, "subjects": [{"apiVersion": "rbac.authorization.k8s.io/v1alpha1", "kind": "Group", "name": "system:authenticated"}]}`,
expectedEtcdPath: "/registry/clusterrolebindings/croleb2",
},
// --
}
// Be very careful when whitelisting an object as ephemeral.
// Doing so removes the safety we gain from this test by skipping that object.
var ephemeralWhiteList = createEphemeralWhiteList(
// k8s.io/kubernetes/federation/apis/federation/v1beta1
gvr("federation", "v1beta1", "clusters"), // we cannot create this
// --
// k8s.io/kubernetes/pkg/api/v1
gvr("", "v1", "bindings"), // annotation on pod, not stored in etcd
gvr("", "v1", "rangeallocations"), // stored in various places in etcd but cannot be directly created
gvr("", "v1", "componentstatuses"), // status info not stored in etcd
gvr("", "v1", "serializedreferences"), // used for serilization, not stored in etcd
gvr("", "v1", "podstatusresults"), // wrapper object not stored in etcd
// --
// k8s.io/kubernetes/pkg/apis/authentication/v1beta1
gvr("authentication.k8s.io", "v1beta1", "tokenreviews"), // not stored in etcd
// --
// k8s.io/kubernetes/pkg/apis/authentication/v1
gvr("authentication.k8s.io", "v1", "tokenreviews"), // not stored in etcd
// --
// k8s.io/kubernetes/pkg/apis/authorization/v1beta1
// SAR objects that are not stored in etcd
gvr("authorization.k8s.io", "v1beta1", "selfsubjectaccessreviews"),
gvr("authorization.k8s.io", "v1beta1", "localsubjectaccessreviews"),
gvr("authorization.k8s.io", "v1beta1", "subjectaccessreviews"),
// --
// k8s.io/kubernetes/pkg/apis/authorization/v1
// SAR objects that are not stored in etcd
gvr("authorization.k8s.io", "v1", "selfsubjectaccessreviews"),
gvr("authorization.k8s.io", "v1", "localsubjectaccessreviews"),
gvr("authorization.k8s.io", "v1", "subjectaccessreviews"),
// --
// k8s.io/kubernetes/pkg/apis/autoscaling/v1
gvr("autoscaling", "v1", "scales"), // not stored in etcd, part of kapiv1.ReplicationController
// --
// k8s.io/kubernetes/pkg/apis/apps/v1beta1
gvr("apps", "v1beta1", "scales"), // not stored in etcd, part of kapiv1.ReplicationController
gvr("apps", "v1beta1", "deploymentrollbacks"), // used to rollback deployment, not stored in etcd
// --
// k8s.io/kubernetes/pkg/apis/batch/v2alpha1
gvr("batch", "v2alpha1", "jobtemplates"), // not stored in etcd
// --
// k8s.io/kubernetes/pkg/apis/componentconfig/v1alpha1
gvr("componentconfig", "v1alpha1", "kubeletconfigurations"), // not stored in etcd
gvr("componentconfig", "v1alpha1", "kubeschedulerconfigurations"), // not stored in etcd
gvr("componentconfig", "v1alpha1", "kubeproxyconfigurations"), // not stored in etcd
// --
// k8s.io/kubernetes/pkg/apis/extensions/v1beta1
gvr("extensions", "v1beta1", "deploymentrollbacks"), // used to rollback deployment, not stored in etcd
gvr("extensions", "v1beta1", "replicationcontrollerdummies"), // not stored in etcd
gvr("extensions", "v1beta1", "scales"), // not stored in etcd, part of kapiv1.ReplicationController
gvr("extensions", "v1beta1", "thirdpartyresourcedatas"), // we cannot create this
// --
// k8s.io/kubernetes/pkg/apis/imagepolicy/v1alpha1
gvr("imagepolicy.k8s.io", "v1alpha1", "imagereviews"), // not stored in etcd
// --
// k8s.io/kubernetes/pkg/apis/policy/v1beta1
gvr("policy", "v1beta1", "evictions"), // not stored in etcd, deals with evicting kapiv1.Pod
// --
)
// Only add kinds to this list when there is no mapping from GVK to GVR (and thus there is no way to create the object)
var kindWhiteList = sets.NewString(
// k8s.io/kubernetes/pkg/api/v1
"DeleteOptions",
"ExportOptions",
"ListOptions",
"NodeProxyOptions",
"PodAttachOptions",
"PodExecOptions",
"PodLogOptions",
"PodProxyOptions",
"ServiceProxyOptions",
"GetOptions",
"APIGroup",
"PodPortForwardOptions",
"APIVersions",
// --
// k8s.io/kubernetes/pkg/watch/versioned
"WatchEvent",
// --
// k8s.io/kubernetes/pkg/api/unversioned
"Status",
// --
)
// namespace used for all tests, do not change this
const testNamespace = "etcdstoragepathtestnamespace"
// TestEtcdStoragePath tests to make sure that all objects are stored in an expected location in etcd.
// It will start failing when a new type is added to ensure that all future types are added to this test.
// It will also fail when a type gets moved to a different location. Be very careful in this situation because
// it essentially means that you will be break old clusters unless you create some migration path for the old data.
func TestEtcdStoragePath(t *testing.T) {
client, kvClient, mapper := startRealMasterOrDie(t)
defer func() {
dumpEtcdKVOnFailure(t, kvClient)
}()
kindSeen := sets.NewString()
etcdSeen := map[schema.GroupVersionResource]empty{}
ephemeralSeen := map[schema.GroupVersionResource]empty{}
for gvk, apiType := range kapi.Scheme.AllKnownTypes() {
// we do not care about internal objects or lists // TODO make sure this is always true
if gvk.Version == runtime.APIVersionInternal || strings.HasSuffix(apiType.Name(), "List") {
continue
}
kind := gvk.Kind
pkgPath := apiType.PkgPath()
mapping, err := mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
if err != nil {
kindSeen.Insert(kind)
if kindWhiteList.Has(kind) {
// t.Logf("skipping test for %s from %s because its GVK %s is whitelisted and has no mapping", kind, pkgPath, gvk)
} else {
t.Errorf("no mapping found for %s from %s but its GVK %s is not whitelisted", kind, pkgPath, gvk)
}
continue
}
gvResource := gvk.GroupVersion().WithResource(mapping.Resource)
etcdSeen[gvResource] = empty{}
testData, hasTest := etcdStorageData[gvResource]
_, isEphemeral := ephemeralWhiteList[gvResource]
if !hasTest && !isEphemeral {
t.Errorf("no test data for %s from %s. Please add a test for your new type to etcdStorageData.", kind, pkgPath)
continue
}
if hasTest && isEphemeral {
t.Errorf("duplicate test data for %s from %s. Object has both test data and is ephemeral.", kind, pkgPath)
continue
}
if isEphemeral { // TODO it would be nice if we could remove this and infer if an object is not stored in etcd
// t.Logf("Skipping test for %s from %s", kind, pkgPath)
ephemeralSeen[gvResource] = empty{}
delete(etcdSeen, gvResource)
continue
}
if len(testData.expectedEtcdPath) == 0 {
t.Errorf("empty test data for %s from %s", kind, pkgPath)
continue
}
shouldCreate := len(testData.stub) != 0 // try to create only if we have a stub
var input *metaObject
if shouldCreate {
if input, err = jsonToMetaObject([]byte(testData.stub)); err != nil || input.isEmpty() {
t.Errorf("invalid test data for %s from %s: %v", kind, pkgPath, err)
continue
}
}
func() { // forces defer to run per iteration of the for loop
all := &[]cleanupData{}
defer func() {
if !t.Failed() { // do not cleanup if test has already failed since we may need things in the etcd dump
if err := client.cleanup(all); err != nil {
t.Fatalf("failed to clean up etcd: %#v", err)
}
}
}()
if err := client.createPrerequisites(mapper, testNamespace, testData.prerequisites, all); err != nil {
t.Errorf("failed to create prerequisites for %s from %s: %#v", kind, pkgPath, err)
return
}
if shouldCreate { // do not try to create items with no stub
if err := client.create(testData.stub, testNamespace, mapping, all); err != nil {
t.Errorf("failed to create stub for %s from %s: %#v", kind, pkgPath, err)
return
}
}
output, err := getFromEtcd(kvClient, testData.expectedEtcdPath)
if err != nil {
t.Errorf("failed to get from etcd for %s from %s: %#v", kind, pkgPath, err)
return
}
expectedGVK := gvk
if testData.expectedGVK != nil {
if gvk == *testData.expectedGVK {
t.Errorf("GVK override %s for %s from %s is unnecessary or something was changed incorrectly", testData.expectedGVK, kind, pkgPath)
}
expectedGVK = *testData.expectedGVK
}
actualGVK := output.getGVK()
if actualGVK != expectedGVK {
t.Errorf("GVK for %s from %s does not match, expected %s got %s", kind, pkgPath, expectedGVK, actualGVK)
}
if !apiequality.Semantic.DeepDerivative(input, output) {
t.Errorf("Test stub for %s from %s does not match: %s", kind, pkgPath, diff.ObjectGoPrintDiff(input, output))
}
}()
}
if inEtcdData, inEtcdSeen := diffMaps(etcdStorageData, etcdSeen); len(inEtcdData) != 0 || len(inEtcdSeen) != 0 {
t.Errorf("etcd data does not match the types we saw:\nin etcd data but not seen:\n%s\nseen but not in etcd data:\n%s", inEtcdData, inEtcdSeen)
}
if inEphemeralWhiteList, inEphemeralSeen := diffMaps(ephemeralWhiteList, ephemeralSeen); len(inEphemeralWhiteList) != 0 || len(inEphemeralSeen) != 0 {
t.Errorf("ephemeral whitelist does not match the types we saw:\nin ephemeral whitelist but not seen:\n%s\nseen but not in ephemeral whitelist:\n%s", inEphemeralWhiteList, inEphemeralSeen)
}
if inKindData, inKindSeen := diffMaps(kindWhiteList, kindSeen); len(inKindData) != 0 || len(inKindSeen) != 0 {
t.Errorf("kind whitelist data does not match the types we saw:\nin kind whitelist but not seen:\n%s\nseen but not in kind whitelist:\n%s", inKindData, inKindSeen)
}
}
func startRealMasterOrDie(t *testing.T) (*allClient, clientv3.KV, meta.RESTMapper) {
_, defaultServiceClusterIPRange, err := net.ParseCIDR("10.0.0.0/24")
if err != nil {
t.Fatal(err)
}
kubeClientConfigValue := atomic.Value{}
storageConfigValue := atomic.Value{}
go func() {
for {
kubeAPIServerOptions := options.NewServerRunOptions()
kubeAPIServerOptions.SecureServing.BindAddress = net.ParseIP("127.0.0.1")
kubeAPIServerOptions.Etcd.StorageConfig.ServerList = []string{framework.GetEtcdURLFromEnv()}
kubeAPIServerOptions.Etcd.DefaultStorageMediaType = runtime.ContentTypeJSON // TODO use protobuf?
kubeAPIServerOptions.ServiceClusterIPRange = *defaultServiceClusterIPRange
kubeAPIServerOptions.Authorization.Mode = "RBAC"
// always get a fresh port in case something claimed the old one
kubePort, err := framework.FindFreeLocalPort()
if err != nil {
t.Fatal(err)
}
kubeAPIServerOptions.SecureServing.BindPort = kubePort
kubeAPIServerConfig, sharedInformers, _, err := app.CreateKubeAPIServerConfig(kubeAPIServerOptions)
if err != nil {
t.Fatal(err)
}
kubeAPIServerConfig.APIResourceConfigSource = &allResourceSource{} // force enable all resources
kubeAPIServer, err := app.CreateKubeAPIServer(kubeAPIServerConfig, sharedInformers, wait.NeverStop)
if err != nil {
t.Fatal(err)
}
kubeClientConfigValue.Store(kubeAPIServerConfig.GenericConfig.LoopbackClientConfig)
storageConfigValue.Store(kubeAPIServerOptions.Etcd.StorageConfig)
if err := kubeAPIServer.GenericAPIServer.PrepareRun().Run(wait.NeverStop); err != nil {
t.Log(err)
}
time.Sleep(100 * time.Millisecond)
}
}()
if err := wait.PollImmediate(100*time.Millisecond, 10*time.Second, func() (done bool, err error) {
obj := kubeClientConfigValue.Load()
if obj == nil {
return false, nil
}
kubeClientConfig := kubeClientConfigValue.Load().(*restclient.Config)
kubeClient, err := kclient.NewForConfig(kubeClientConfig)
if err != nil {
// this happens because we race the API server start
t.Log(err)
return false, nil
}
if _, err := kubeClient.Discovery().ServerVersion(); err != nil {
return false, nil
}
return true, nil
}); err != nil {
t.Fatal(err)
}
kubeClientConfig := kubeClientConfigValue.Load().(*restclient.Config)
storageConfig := storageConfigValue.Load().(storagebackend.Config)
kubeClient := clientset.NewForConfigOrDie(kubeClientConfig)
if _, err := kubeClient.CoreV1().Namespaces().Create(&v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: testNamespace}}); err != nil {
t.Fatal(err)
}
client, err := newClient(*kubeClientConfig)
if err != nil {
t.Fatal(err)
}
kvClient, err := getEtcdKVClient(storageConfig)
if err != nil {
t.Fatal(err)
}
mapper, _ := util.NewFactory(clientcmd.NewDefaultClientConfig(*clientcmdapi.NewConfig(), &clientcmd.ConfigOverrides{})).Object()
return client, kvClient, mapper
}
func dumpEtcdKVOnFailure(t *testing.T, kvClient clientv3.KV) {
if t.Failed() {
response, err := kvClient.Get(context.Background(), "/", clientv3.WithPrefix())
if err != nil {
t.Fatal(err)
}
for _, kv := range response.Kvs {
t.Error(string(kv.Key), "->", string(kv.Value))
}
}
}
// stable fields to compare as a sanity check
type metaObject struct {
// all of type meta
Kind string `json:"kind,omitempty" protobuf:"bytes,1,opt,name=kind"`
APIVersion string `json:"apiVersion,omitempty" protobuf:"bytes,2,opt,name=apiVersion"`
// parts of object meta
Metadata struct {
Name string `json:"name,omitempty" protobuf:"bytes,1,opt,name=name"`
Namespace string `json:"namespace,omitempty" protobuf:"bytes,2,opt,name=namespace"`
} `json:"metadata,omitempty" protobuf:"bytes,3,opt,name=metadata"`
}
func (obj *metaObject) getGVK() schema.GroupVersionKind {
return schema.FromAPIVersionAndKind(obj.APIVersion, obj.Kind)
}
func (obj *metaObject) isEmpty() bool {
return obj == nil || *obj == metaObject{} // compare to zero value since all fields are strings
}
type prerequisite struct {
gvrData schema.GroupVersionResource
stub string
}
type empty struct{}
type cleanupData struct {
obj runtime.Object
mapping *meta.RESTMapping
}
func gvr(g, v, r string) schema.GroupVersionResource {
return schema.GroupVersionResource{Group: g, Version: v, Resource: r}
}
func gvkP(g, v, k string) *schema.GroupVersionKind {
return &schema.GroupVersionKind{Group: g, Version: v, Kind: k}
}
func createEphemeralWhiteList(gvrs ...schema.GroupVersionResource) map[schema.GroupVersionResource]empty {
ephemeral := map[schema.GroupVersionResource]empty{}
for _, gvResource := range gvrs {
if _, ok := ephemeral[gvResource]; ok {
panic("invalid ephemeral whitelist contains duplicate keys")
}
ephemeral[gvResource] = empty{}
}
return ephemeral
}
func jsonToMetaObject(stub []byte) (*metaObject, error) {
obj := &metaObject{}
if err := json.Unmarshal(stub, obj); err != nil {
return nil, err
}
return obj, nil
}
func keyStringer(i interface{}) string {
base := "\n\t"
switch key := i.(type) {
case string:
return base + key
case schema.GroupVersionResource:
return base + key.String()
default:
panic("unexpected type")
}
}
type allClient struct {
client *http.Client
config *restclient.Config
backoff restclient.BackoffManager
}
func (c *allClient) verb(verb string, gvk schema.GroupVersionKind) (*restclient.Request, error) {
apiPath := "/apis"
if gvk.Group == kapi.GroupName {
apiPath = "/api"
}
baseURL, versionedAPIPath, err := restclient.DefaultServerURL(c.config.Host, apiPath, gvk.GroupVersion(), true)
if err != nil {
return nil, err
}
contentConfig := c.config.ContentConfig
gv := gvk.GroupVersion()
contentConfig.GroupVersion = &gv
serializers, err := createSerializers(contentConfig)
if err != nil {
return nil, err
}
return restclient.NewRequest(c.client, verb, baseURL, versionedAPIPath, contentConfig, *serializers, c.backoff, c.config.RateLimiter), nil
}
func (c *allClient) create(stub, ns string, mapping *meta.RESTMapping, all *[]cleanupData) error {
req, err := c.verb("POST", mapping.GroupVersionKind)
if err != nil {
return err
}
namespaced := mapping.Scope.Name() == meta.RESTScopeNameNamespace
output, err := req.NamespaceIfScoped(ns, namespaced).Resource(mapping.Resource).Body(strings.NewReader(stub)).Do().Get()
if err != nil {
return err
}
*all = append(*all, cleanupData{output, mapping})
return nil
}
func (c *allClient) destroy(obj runtime.Object, mapping *meta.RESTMapping) error {
req, err := c.verb("DELETE", mapping.GroupVersionKind)
if err != nil {
return err
}
namespaced := mapping.Scope.Name() == meta.RESTScopeNameNamespace
name, err := mapping.MetadataAccessor.Name(obj)
if err != nil {
return err
}
ns, err := mapping.MetadataAccessor.Namespace(obj)
if err != nil {
return err
}
return req.NamespaceIfScoped(ns, namespaced).Resource(mapping.Resource).Name(name).Do().Error()
}
func (c *allClient) cleanup(all *[]cleanupData) error {
for i := len(*all) - 1; i >= 0; i-- { // delete in reverse order in case creation order mattered
obj := (*all)[i].obj
mapping := (*all)[i].mapping
if err := c.destroy(obj, mapping); err != nil {
return err
}
}
return nil
}
func (c *allClient) createPrerequisites(mapper meta.RESTMapper, ns string, prerequisites []prerequisite, all *[]cleanupData) error {
for _, prerequisite := range prerequisites {
gvk, err := mapper.KindFor(prerequisite.gvrData)
if err != nil {
return err
}
mapping, err := mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
if err != nil {
return err
}
if err := c.create(prerequisite.stub, ns, mapping, all); err != nil {
return err
}
}
return nil
}
func newClient(config restclient.Config) (*allClient, error) {
config.ContentConfig.NegotiatedSerializer = kapi.Codecs
config.ContentConfig.ContentType = "application/json"
config.Timeout = 30 * time.Second
config.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(3, 10)
transport, err := restclient.TransportFor(&config)
if err != nil {
return nil, err
}
client := &http.Client{
Transport: transport,
Timeout: config.Timeout,
}
backoff := &restclient.URLBackoff{
Backoff: flowcontrol.NewBackOff(1*time.Second, 10*time.Second),
}
return &allClient{
client: client,
config: &config,
backoff: backoff,
}, nil
}
// copied from restclient
func createSerializers(config restclient.ContentConfig) (*restclient.Serializers, error) {
mediaTypes := config.NegotiatedSerializer.SupportedMediaTypes()
contentType := config.ContentType
mediaType, _, err := mime.ParseMediaType(contentType)
if err != nil {
return nil, fmt.Errorf("the content type specified in the client configuration is not recognized: %v", err)
}
info, ok := runtime.SerializerInfoForMediaType(mediaTypes, mediaType)
if !ok {
if len(contentType) != 0 || len(mediaTypes) == 0 {
return nil, fmt.Errorf("no serializers registered for %s", contentType)
}
info = mediaTypes[0]
}
internalGV := schema.GroupVersions{
{
Group: config.GroupVersion.Group,
Version: runtime.APIVersionInternal,
},
// always include the legacy group as a decoding target to handle non-error `Status` return types
{
Group: "",
Version: runtime.APIVersionInternal,
},
}
s := &restclient.Serializers{
Encoder: config.NegotiatedSerializer.EncoderForVersion(info.Serializer, *config.GroupVersion),
Decoder: config.NegotiatedSerializer.DecoderToVersion(info.Serializer, internalGV),
RenegotiatedDecoder: func(contentType string, params map[string]string) (runtime.Decoder, error) {
info, ok := runtime.SerializerInfoForMediaType(mediaTypes, contentType)
if !ok {
return nil, fmt.Errorf("serializer for %s not registered", contentType)
}
return config.NegotiatedSerializer.DecoderToVersion(info.Serializer, internalGV), nil
},
}
if info.StreamSerializer != nil {
s.StreamingSerializer = info.StreamSerializer.Serializer
s.Framer = info.StreamSerializer.Framer
}
return s, nil
}
func getFromEtcd(keys clientv3.KV, path string) (*metaObject, error) {
response, err := keys.Get(context.Background(), path)
if err != nil {
return nil, err
}
if response.More || response.Count != 1 || len(response.Kvs) != 1 {
return nil, fmt.Errorf("Invalid etcd response (not found == %v): %#v", response.Count == 0, response)
}
return jsonToMetaObject(response.Kvs[0].Value)
}
func diffMaps(a, b interface{}) ([]string, []string) {
inA := diffMapKeys(a, b, keyStringer)
inB := diffMapKeys(b, a, keyStringer)
return inA, inB
}
func diffMapKeys(a, b interface{}, stringer func(interface{}) string) []string {
av := reflect.ValueOf(a)
bv := reflect.ValueOf(b)
ret := []string{}
for _, ka := range av.MapKeys() {
kat := ka.Interface()
found := false
for _, kb := range bv.MapKeys() {
kbt := kb.Interface()
if kat == kbt {
found = true
break
}
}
if !found {
ret = append(ret, stringer(kat))
}
}
return ret
}
func getEtcdKVClient(config storagebackend.Config) (clientv3.KV, error) {
tlsInfo := transport.TLSInfo{
CertFile: config.CertFile,
KeyFile: config.KeyFile,
CAFile: config.CAFile,
}
tlsConfig, err := tlsInfo.ClientConfig()
if err != nil {
return nil, err
}
cfg := clientv3.Config{
Endpoints: config.ServerList,
TLS: tlsConfig,
}
c, err := clientv3.New(cfg)
if err != nil {
return nil, err
}
return clientv3.NewKV(c), nil
}
type allResourceSource struct{}
func (*allResourceSource) AnyVersionOfResourceEnabled(resource schema.GroupResource) bool { return true }
func (*allResourceSource) AllResourcesForVersionEnabled(version schema.GroupVersion) bool { return true }
func (*allResourceSource) AnyResourcesForGroupEnabled(group string) bool { return true }
func (*allResourceSource) ResourceEnabled(resource schema.GroupVersionResource) bool { return true }
func (*allResourceSource) AnyResourcesForVersionEnabled(version schema.GroupVersion) bool { return true }