federation: Add admission controller for policy-based placement

pull/6/head
Torin Sandall 2017-04-18 10:34:45 -07:00
parent 0cff839317
commit 470e99c6a5
10 changed files with 1017 additions and 0 deletions

View File

@ -30,6 +30,7 @@ filegroup(
"//federation/pkg/federatedtypes:all-srcs",
"//federation/pkg/federation-controller:all-srcs",
"//federation/pkg/kubefed:all-srcs",
"//federation/plugin/pkg/admission/schedulingpolicy:all-srcs",
"//federation/registry/cluster:all-srcs",
],
tags = ["automanaged"],

View File

@ -28,6 +28,7 @@ go_library(
"//federation/apis/federation/install:go_default_library",
"//federation/apis/federation/v1beta1:go_default_library",
"//federation/cmd/federation-apiserver/app/options:go_default_library",
"//federation/plugin/pkg/admission/schedulingpolicy:go_default_library",
"//federation/registry/cluster/etcd:go_default_library",
"//pkg/api:go_default_library",
"//pkg/api/install:go_default_library",

View File

@ -25,6 +25,7 @@ import (
// Admission policies
"k8s.io/apiserver/pkg/admission"
"k8s.io/kubernetes/federation/plugin/pkg/admission/schedulingpolicy"
"k8s.io/kubernetes/plugin/pkg/admission/admit"
"k8s.io/kubernetes/plugin/pkg/admission/deny"
"k8s.io/kubernetes/plugin/pkg/admission/gc"
@ -37,4 +38,5 @@ func registerAllAdmissionPlugins(plugins *admission.Plugins) {
deny.Register(plugins)
gc.Register(plugins)
initialization.Register(plugins)
schedulingpolicy.Register(plugins)
}

View File

@ -0,0 +1,70 @@
package(default_visibility = ["//visibility:public"])
licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)
go_test(
name = "go_default_test",
srcs = [
"admission_test.go",
"merge_test.go",
],
library = ":go_default_library",
tags = ["automanaged"],
deps = [
"//pkg/api:go_default_library",
"//pkg/api/v1:go_default_library",
"//pkg/apis/extensions/v1beta1:go_default_library",
"//pkg/client/clientset_generated/internalclientset/fake:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apiserver/pkg/admission:go_default_library",
"//vendor/k8s.io/apiserver/pkg/authentication/user:go_default_library",
"//vendor/k8s.io/client-go/testing:go_default_library",
],
)
go_library(
name = "go_default_library",
srcs = [
"admission.go",
"merge.go",
"query.go",
],
tags = ["automanaged"],
deps = [
"//pkg/api:go_default_library",
"//pkg/api/ref:go_default_library",
"//pkg/client/clientset_generated/internalclientset:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/yaml:go_default_library",
"//vendor/k8s.io/apiserver/pkg/admission:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/webhook:go_default_library",
"//vendor/k8s.io/client-go/dynamic:go_default_library",
"//vendor/k8s.io/client-go/rest:go_default_library",
"//vendor/k8s.io/client-go/tools/clientcmd:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
)

View File

@ -0,0 +1,213 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package schedulingpolicy implements a webhook that queries an external API
// to obtain scheduling decisions for Federated sources.
package schedulingpolicy
import (
"fmt"
"io"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/yaml"
"k8s.io/apiserver/pkg/admission"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/ref"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
)
const (
pluginName = "SchedulingPolicy"
configKey = "schedulingPolicy"
policyConfigMapNamespace = "kube-federation-scheduling-policy"
// Default backoff delay for policy engine query retries. The actual
// backoff implementation is handled by k8s.io/apiserver/pkg/util/webhook.
// If the admission controller config file does not specify a backoff, this
// one is used.
defaultRetryBackoff = time.Millisecond * 100
)
type admissionConfig struct {
Kubeconfig string `json:"kubeconfig"`
RetryBackoff time.Duration `json:"retryBackoff"`
}
type admissionController struct {
*admission.Handler
policyEngineClient *rest.RESTClient // client to communicate with policy engine
policyEngineRetryBackoff time.Duration // backoff for policy engine queries
client internalclientset.Interface // client to communicate with federation-apiserver
}
// Register registers the plugin.
func Register(plugins *admission.Plugins) {
plugins.Register(pluginName, func(file io.Reader) (admission.Interface, error) {
return newAdmissionController(file)
})
}
func newAdmissionController(file io.Reader) (*admissionController, error) {
config, err := loadConfig(file)
if err != nil {
return nil, err
}
policyEngineClient, err := loadRestClient(config.Kubeconfig)
if err != nil {
return nil, err
}
c := &admissionController{
Handler: admission.NewHandler(admission.Create, admission.Update),
policyEngineClient: policyEngineClient,
policyEngineRetryBackoff: config.RetryBackoff,
}
return c, nil
}
func (c *admissionController) Validate() error {
if c.client == nil {
return fmt.Errorf("%s requires a client", pluginName)
}
return nil
}
func (c *admissionController) SetInternalKubeClientSet(client internalclientset.Interface) {
c.client = client
}
func (c *admissionController) Admit(a admission.Attributes) (err error) {
exists, err := c.policyExists()
if err != nil {
return c.handleError(a, err)
}
if !exists {
return nil
}
obj := a.GetObject()
decision, err := newPolicyEngineQuery(c.policyEngineClient, c.policyEngineRetryBackoff, obj, a.GetKind()).Do()
if err != nil {
return c.handleError(a, err)
}
if err := decision.Error(); err != nil {
return c.handleError(a, err)
}
mergeAnnotations(obj, decision.Annotations)
return nil
}
func (c *admissionController) handleError(a admission.Attributes, err error) error {
c.publishEvent(a, err.Error())
return admission.NewForbidden(a, err)
}
func (c *admissionController) publishEvent(a admission.Attributes, msg string) {
obj := a.GetObject()
ref, err := ref.GetReference(api.Scheme, obj)
if err != nil {
runtime.HandleError(err)
return
}
event := &api.Event{
InvolvedObject: *ref,
Message: msg,
Source: api.EventSource{
Component: fmt.Sprintf("schedulingpolicy"),
},
Type: "Warning",
}
if _, err := c.client.Core().Events(a.GetNamespace()).Create(event); err != nil {
runtime.HandleError(err)
return
}
}
func (c *admissionController) policyExists() (bool, error) {
lst, err := c.client.Core().ConfigMaps(policyConfigMapNamespace).List(metav1.ListOptions{})
if err != nil {
return true, err
}
return len(lst.Items) > 0, nil
}
func loadConfig(file io.Reader) (*admissionConfig, error) {
var cfg admissionConfig
if file == nil {
return nil, fmt.Errorf("--admission-control-config-file not specified or invalid")
}
if err := yaml.NewYAMLOrJSONDecoder(file, 4096).Decode(&cfg); err != nil {
return nil, err
}
if len(cfg.Kubeconfig) == 0 {
return nil, fmt.Errorf("kubeconfig path must not be empty")
}
if cfg.RetryBackoff == 0 {
cfg.RetryBackoff = defaultRetryBackoff
} else {
// Scale up value from config (which is unmarshalled as ns).
cfg.RetryBackoff *= time.Millisecond
}
if cfg.RetryBackoff.Nanoseconds() < 0 {
return nil, fmt.Errorf("retryBackoff must not be negative")
}
return &cfg, nil
}
func loadRestClient(kubeConfigFile string) (*rest.RESTClient, error) {
loadingRules := clientcmd.NewDefaultClientConfigLoadingRules()
loadingRules.ExplicitPath = kubeConfigFile
loader := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loadingRules, &clientcmd.ConfigOverrides{})
clientConfig, err := loader.ClientConfig()
if err != nil {
return nil, err
}
clientConfig.ContentConfig.NegotiatedSerializer = dynamic.ContentConfig().NegotiatedSerializer
restClient, err := rest.UnversionedRESTClientFor(clientConfig)
if err != nil {
return nil, err
}
return restClient, nil
}

View File

@ -0,0 +1,473 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package schedulingpolicy
import (
"bytes"
"encoding/json"
"fmt"
"html/template"
"io"
"io/ioutil"
"net/http"
"net/http/httptest"
"os"
"reflect"
"testing"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apiserver/pkg/admission"
"k8s.io/apiserver/pkg/authentication/user"
core "k8s.io/client-go/testing"
"k8s.io/kubernetes/pkg/api"
extensionsv1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
)
func TestNewAdmissionController(t *testing.T) {
tempfile, err := ioutil.TempFile("", "")
if err != nil {
t.Fatalf("Unexpected error while creating temporary file: %v", err)
}
p := tempfile.Name()
defer os.Remove(p)
kubeconfig := `
clusters:
- name: foo
cluster:
server: https://example.com
users:
- name: alice
user:
token: deadbeef
contexts:
- name: default
context:
cluster: foo
user: alice
current-context: default
`
if _, err := tempfile.WriteString(kubeconfig); err != nil {
t.Fatalf("Unexpected error while writing test kubeconfig file: %v", err)
}
tests := []struct {
note string
input string
wantErr bool
}{
{"no config", "", true},
{"bad json", `{"foo": `, true},
{"bad yaml", `{foo" `, true},
{
"missing kubeconfig",
`{"foo": {}}`,
true,
},
{
"kubeconfig not found",
`{
"kubeconfig": "/kube-federation-scheduling-policy-file-not-found-test"
}`,
true,
},
{
"bad retry backoff",
fmt.Sprintf(`
{
"kubeconfig": %q,
"retryBackoff": -1
}
`, p),
true,
},
{
"a valid config",
fmt.Sprintf(`
{
"kubeconfig": %q
}
`, p),
false,
},
{
"a valid config with retry backoff",
fmt.Sprintf(`
{
"kubeconfig": %q,
"retryBackoff": 200
}
`, p),
false,
},
}
for _, tc := range tests {
var file io.Reader
if tc.input == "" {
file = nil
} else {
file = bytes.NewBufferString(tc.input)
}
_, err := newAdmissionController(file)
if tc.wantErr && err == nil {
t.Errorf("%v: Expected error", tc.note)
} else if !tc.wantErr && err != nil {
t.Errorf("%v: Unexpected error: %v", tc.note, err)
}
}
}
func TestAdmitQueryPayload(t *testing.T) {
var body interface{}
serve := func(w http.ResponseWriter, r *http.Request) {
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
t.Fatalf("Unexpected error reading admission payload: %v", err)
}
// No errors or annotations.
w.Write([]byte(`{}`))
}
controller, err := newControllerWithTestServer(serve, true)
if err != nil {
t.Fatalf("Unexpected error while creating test admission controller/server: %v", err)
}
rs := makeReplicaSet()
rs.Spec.MinReadySeconds = 100
attrs := makeAdmissionRecord(rs)
err = controller.Admit(attrs)
if err != nil {
t.Fatalf("Unexpected error from admission controller: %v", err)
}
obj := body.(map[string]interface{})
metadata := obj["metadata"].(map[string]interface{})
spec := obj["spec"].(map[string]interface{})
name := metadata["name"].(string)
minReadySeconds := spec["minReadySeconds"].(float64)
expectedName := "myapp"
if name != expectedName {
t.Fatalf("Expected replicaset.metadata.name to be %v but got: %v", expectedName, name)
}
expectedMinReadySeconds := float64(100)
if minReadySeconds != expectedMinReadySeconds {
t.Fatalf("Expected replicaset.spec.minReadySeconds to be %v but got: %v", expectedMinReadySeconds, minReadySeconds)
}
}
func TestAdmitFailInternal(t *testing.T) {
serve := func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200)
}
controller, err := newControllerWithTestServer(serve, false)
if err != nil {
t.Fatalf("Unexpected error while creating test admission controller/server: %v", err)
}
mockClient := &fake.Clientset{}
mockClient.AddReactor("list", "configmaps", func(action core.Action) (bool, runtime.Object, error) {
return true, nil, fmt.Errorf("unknown error")
})
controller.SetInternalKubeClientSet(mockClient)
attrs := makeAdmissionRecord(makeReplicaSet())
err = controller.Admit(attrs)
if err == nil {
t.Fatalf("Expected admission controller to fail closed")
}
}
func TestAdmitPolicyDoesNotExist(t *testing.T) {
controller, err := newControllerWithTestServer(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(404)
}, false)
if err != nil {
t.Fatalf("Unexpected error while creating test admission controller/server: %v", err)
}
attrs := makeAdmissionRecord(makeReplicaSet())
err = controller.Admit(attrs)
if err != nil {
t.Fatalf("Expected admission controller to fail open but got error: %v", err)
}
}
func TestAdmitFailClosed(t *testing.T) {
tests := []struct {
note string
statusCode int
body string
}{
{"server error", 500, ""},
{"unmarshal error", 200, "{"},
{"undefined result", 404, ``},
{"policy errors", 200, `{"errors": ["conflicting replica-set-preferences"]}`},
}
for _, tc := range tests {
serve := func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(tc.statusCode)
if len(tc.body) > 0 {
w.Write([]byte(tc.body))
}
}
controller, err := newControllerWithTestServer(serve, true)
if err != nil {
t.Errorf("%v: Unexpected error while creating test admission controller/server: %v", tc.note, err)
continue
}
obj := makeReplicaSet()
attrs := admission.NewAttributesRecord(obj, nil, obj.GroupVersionKind(), obj.Namespace, obj.Name, api.Resource("replicasets").WithVersion("version"), "", admission.Create, &user.DefaultInfo{})
err = controller.Admit(attrs)
if err == nil {
t.Errorf("%v: Expected admission controller to fail closed", tc.note)
}
}
}
func TestAdmitRetries(t *testing.T) {
var numQueries int
controller, err := newControllerWithTestServer(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(500)
numQueries++
}, true)
if err != nil {
t.Fatalf("Unexpected error while creating test admission controller/server: %v", err)
}
err = controller.Admit(makeAdmissionRecord(makeReplicaSet()))
if err == nil {
t.Fatalf("Expected admission controller to fail closed")
}
if numQueries <= 1 {
t.Fatalf("Expected multiple queries/retries but got (numQueries): %v", numQueries)
}
}
func TestAdmitSuccessWithAnnotationMerge(t *testing.T) {
controller, err := newControllerWithTestServer(func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(`
{
"annotations": {
"foo": "bar-2"
}
}
`))
}, true)
if err != nil {
t.Fatalf("Unexpected error while creating test admission controller/server: %v", err)
}
obj := makeReplicaSet()
obj.Annotations = map[string]string{}
obj.Annotations["foo"] = "bar"
obj.Annotations["bar"] = "baz"
attrs := admission.NewAttributesRecord(obj, nil, obj.GroupVersionKind(), obj.Namespace, obj.Name, api.Resource("replicasets").WithVersion("version"), "", admission.Create, &user.DefaultInfo{})
err = controller.Admit(attrs)
if err != nil {
t.Fatalf("Unexpected error from admission controller: %v", err)
}
annotations := attrs.GetObject().(*extensionsv1.ReplicaSet).Annotations
expected := map[string]string{
"foo": "bar-2",
"bar": "baz",
}
if !reflect.DeepEqual(annotations, expected) {
t.Fatalf("Expected annotations to be %v but got: %v", expected, annotations)
}
}
func newControllerWithTestServer(f func(w http.ResponseWriter, r *http.Request), policiesExist bool) (*admissionController, error) {
server, err := newTestServer(f)
if err != nil {
return nil, err
}
kubeConfigFile, err := makeKubeConfigFile(server.URL, "/some/path/to/decision")
if err != nil {
return nil, err
}
defer os.Remove(kubeConfigFile)
configFile, err := makeAdmissionControlConfigFile(kubeConfigFile)
if err != nil {
return nil, err
}
defer os.Remove(configFile)
file, err := os.Open(configFile)
if err != nil {
return nil, err
}
controller, err := newAdmissionController(file)
if err != nil {
return nil, err
}
mockClient := &fake.Clientset{}
var items []api.ConfigMap
if policiesExist {
items = append(items, api.ConfigMap{})
}
mockClient.AddReactor("list", "configmaps", func(action core.Action) (bool, runtime.Object, error) {
if action.GetNamespace() == policyConfigMapNamespace {
return true, &api.ConfigMapList{Items: items}, nil
}
return true, nil, nil
})
controller.SetInternalKubeClientSet(mockClient)
return controller, nil
}
func newTestServer(f func(w http.ResponseWriter, r *http.Request)) (*httptest.Server, error) {
server := httptest.NewUnstartedServer(http.HandlerFunc(f))
server.Start()
return server, nil
}
func makeAdmissionControlConfigFile(kubeConfigFile string) (string, error) {
tempfile, err := ioutil.TempFile("", "")
if err != nil {
return "", err
}
p := tempfile.Name()
configFileTmpl := `
kubeconfig: {{ .KubeConfigFile }}
retryBackoff: {{ .RetryBackoff }}
`
type configFileTemplateInput struct {
KubeConfigFile string
RetryBackoff int
}
input := configFileTemplateInput{
KubeConfigFile: kubeConfigFile,
RetryBackoff: 1,
}
tmpl, err := template.New("scheduling-policy-config").Parse(configFileTmpl)
if err != nil {
return "", err
}
if err := tmpl.Execute(tempfile, input); err != nil {
return "", err
}
return p, nil
}
func makeKubeConfigFile(baseURL, path string) (string, error) {
tempfile, err := ioutil.TempFile("", "")
if err != nil {
return "", err
}
p := tempfile.Name()
kubeConfigTmpl := `
clusters:
- name: test
cluster:
server: {{ .BaseURL }}{{ .Path }}
users:
- name: alice
user:
token: deadbeef
contexts:
- name: default
context:
cluster: test
user: alice
current-context: default`
type kubeConfigTemplateInput struct {
BaseURL string
Path string
}
input := kubeConfigTemplateInput{
BaseURL: baseURL,
Path: path,
}
tmpl, err := template.New("kubeconfig").Parse(kubeConfigTmpl)
if err != nil {
return "", err
}
if err := tmpl.Execute(tempfile, input); err != nil {
return "", err
}
return p, nil
}
func makeAdmissionRecord(obj *extensionsv1.ReplicaSet) admission.Attributes {
return admission.NewAttributesRecord(obj, nil, obj.GroupVersionKind(), obj.Namespace, obj.Name, api.Resource("replicasets").WithVersion("version"), "", admission.Create, &user.DefaultInfo{})
}
func makeReplicaSet() *extensionsv1.ReplicaSet {
return &extensionsv1.ReplicaSet{
TypeMeta: metav1.TypeMeta{
Kind: "ReplicaSet",
APIVersion: "extensions/v1beta1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "myapp",
},
Spec: extensionsv1.ReplicaSetSpec{},
}
}

View File

@ -0,0 +1,45 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package schedulingpolicy
import (
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
)
// mergeAnnotations updates obj so that the provided annotations supersede the
// existing annotations.
func mergeAnnotations(obj runtime.Object, annotations map[string]string) error {
if len(annotations) == 0 {
return nil
}
accessor, err := meta.Accessor(obj)
if err != nil {
return err
}
orig := accessor.GetAnnotations()
for k := range orig {
if _, ok := annotations[k]; !ok {
annotations[k] = orig[k]
}
}
accessor.SetAnnotations(annotations)
return nil
}

View File

@ -0,0 +1,66 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package schedulingpolicy
import (
"encoding/json"
"reflect"
"testing"
"k8s.io/kubernetes/pkg/api/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func TestMergeAnnotations(t *testing.T) {
tests := []struct {
note string
input *v1.Pod
annotations string
expected string
}{
{"nil annotations", &v1.Pod{}, `{"foo": "bar"}`, `{"foo": "bar"}`},
{"empty annotations", &v1.Pod{ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{}}}, `{"foo": "bar"}`, `{"foo": "bar"}`},
{"existing annotation", &v1.Pod{ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{"foo": "baz"}}}, `{"foo": "bar"}`, `{"foo": "bar"}`},
{"different annotation", &v1.Pod{ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{"baz": "qux"}}}, `{"foo": "bar"}`, `{"baz": "qux", "foo": "bar"}`},
}
for _, tc := range tests {
annotations := map[string]string{}
if err := json.Unmarshal([]byte(tc.annotations), &annotations); err != nil {
panic(err)
}
expected := map[string]string{}
if err := json.Unmarshal([]byte(tc.expected), &expected); err != nil {
panic(err)
}
err := mergeAnnotations(tc.input, annotations)
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
if !reflect.DeepEqual(tc.input.ObjectMeta.Annotations, expected) {
t.Errorf("%v: Expected annotations to equal %v but got: %v", tc.note, expected, tc.input.ObjectMeta.Annotations)
}
}
}

View File

@ -0,0 +1,145 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package schedulingpolicy
import (
"bytes"
"encoding/json"
"fmt"
"strings"
"time"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/util/webhook"
"k8s.io/client-go/rest"
"k8s.io/kubernetes/pkg/api"
)
// policyUndefinedError represents an undefined response from the policy
// engine. This typically means the relevant policy has not been loaded into
// the engine.
type policyUndefinedError struct{}
func (policyUndefinedError) Error() string {
return "policy decision is undefined"
}
// policyEngineQuery represents a single query against the policy engine.
type policyEngineQuery struct {
client *rest.RESTClient
retryBackoff time.Duration
obj runtime.Object
gvk schema.GroupVersionKind
}
// newPolicyEngineQuery returns a policyEngineQuery that can be executed.
func newPolicyEngineQuery(client *rest.RESTClient, retryBackoff time.Duration, obj runtime.Object, gvk schema.GroupVersionKind) *policyEngineQuery {
return &policyEngineQuery{
client: client,
retryBackoff: retryBackoff,
obj: obj,
gvk: gvk,
}
}
// Do returns the result of the policy engine query. If the policy decision is
// undefined or an unknown error occurs, err is non-nil. Otherwise, result is
// non-nil and contains the result of policy evaluation.
func (query *policyEngineQuery) Do() (decision *policyDecision, err error) {
bs, err := query.encode()
if err != nil {
return nil, err
}
var result rest.Result
err = webhook.WithExponentialBackoff(query.retryBackoff, func() error {
result = query.client.Post().
Body(bs).
Do()
return result.Error()
})
if err != nil {
if errors.IsNotFound(err) {
return nil, policyUndefinedError{}
}
return nil, err
}
return decodeResult(result)
}
// encode returns the encoded version of the query's runtime.Object.
func (query *policyEngineQuery) encode() ([]byte, error) {
var info runtime.SerializerInfo
infos := api.Codecs.SupportedMediaTypes()
for i := range infos {
if infos[i].MediaType == "application/json" {
info = infos[i]
}
}
if info.Serializer == nil {
return nil, fmt.Errorf("serialization not supported")
}
codec := api.Codecs.EncoderForVersion(info.Serializer, query.gvk.GroupVersion())
var buf bytes.Buffer
if err := codec.Encode(query.obj, &buf); err != nil {
return nil, err
}
return buf.Bytes(), nil
}
// policyDecision represents a response from the policy engine.
type policyDecision struct {
Errors []string `json:"errors,omitempty"`
Annotations map[string]string `json:"annotations,omitempty"`
}
// Error returns an error if the policy raised an error.
func (d *policyDecision) Error() error {
if len(d.Errors) == 0 {
return nil
}
return fmt.Errorf("reason(s): %v", strings.Join(d.Errors, "; "))
}
func decodeResult(result rest.Result) (*policyDecision, error) {
bs, err := result.Raw()
if err != nil {
return nil, err
}
buf := bytes.NewBuffer(bs)
var decision policyDecision
if err := json.NewDecoder(buf).Decode(&decision); err != nil {
return nil, err
}
return &decision, nil
}

View File

@ -51,6 +51,7 @@ federation/cmd/federation-controller-manager
federation/cmd/genfeddocs
federation/cmd/kubefed
federation/pkg/federation-controller/util/replicapreferences
federation/plugin/pkg/admission/schedulingpolicy
hack
hack/boilerplate/test
hack/cmd/teststale