Merge pull request #71548 from smarterclayton/watch_converted

Support Table and PartialObjectMetadata on watch
k3s-v1.15.3
Kubernetes Prow Robot 2019-03-19 22:42:22 -07:00 committed by GitHub
commit 6f9bf5fe98
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
42 changed files with 1274 additions and 191 deletions

View File

@ -22,10 +22,11 @@ import (
fuzz "github.com/google/gofuzz"
appsv1 "k8s.io/api/apps/v1"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
apitesting "k8s.io/apimachinery/pkg/api/apitesting"
"k8s.io/apimachinery/pkg/api/apitesting/fuzzer"
genericfuzzer "k8s.io/apimachinery/pkg/apis/meta/fuzzer"
metafuzzer "k8s.io/apimachinery/pkg/apis/meta/fuzzer"
"k8s.io/apimachinery/pkg/runtime"
runtimeserializer "k8s.io/apimachinery/pkg/runtime/serializer"
admissionregistrationfuzzer "k8s.io/kubernetes/pkg/apis/admissionregistration/fuzzer"
@ -105,4 +106,5 @@ var FuzzerFuncs = fuzzer.MergeFuzzerFuncs(
auditregistrationfuzzer.Funcs,
storagefuzzer.Funcs,
networkingfuzzer.Funcs,
metafuzzer.Funcs,
)

View File

@ -509,14 +509,17 @@ func (h *HumanReadablePrinter) PrintTable(obj runtime.Object, options PrintOptio
return nil, results[1].Interface().(error)
}
columns := handler.columnDefinitions
if !options.Wide {
columns = make([]metav1beta1.TableColumnDefinition, 0, len(handler.columnDefinitions))
for i := range handler.columnDefinitions {
if handler.columnDefinitions[i].Priority != 0 {
continue
var columns []metav1beta1.TableColumnDefinition
if !options.NoHeaders {
columns = handler.columnDefinitions
if !options.Wide {
columns = make([]metav1beta1.TableColumnDefinition, 0, len(handler.columnDefinitions))
for i := range handler.columnDefinitions {
if handler.columnDefinitions[i].Priority != 0 {
continue
}
columns = append(columns, handler.columnDefinitions[i])
}
columns = append(columns, handler.columnDefinitions[i])
}
}
table := &metav1beta1.Table{

View File

@ -430,14 +430,14 @@ func AddHandlers(h printers.PrintHandler) {
h.TableHandler(controllerRevisionColumnDefinition, printControllerRevision)
h.TableHandler(controllerRevisionColumnDefinition, printControllerRevisionList)
resorceQuotaColumnDefinitions := []metav1beta1.TableColumnDefinition{
resourceQuotaColumnDefinitions := []metav1beta1.TableColumnDefinition{
{Name: "Name", Type: "string", Format: "name", Description: metav1.ObjectMeta{}.SwaggerDoc()["name"]},
{Name: "Age", Type: "string", Description: metav1.ObjectMeta{}.SwaggerDoc()["creationTimestamp"]},
{Name: "Request", Type: "string", Description: "Request represents a minimum amount of cpu/memory that a container may consume."},
{Name: "Limit", Type: "string", Description: "Limits control the maximum amount of cpu/memory that a container may use independent of contention on the node."},
}
h.TableHandler(resorceQuotaColumnDefinitions, printResourceQuota)
h.TableHandler(resorceQuotaColumnDefinitions, printResourceQuotaList)
h.TableHandler(resourceQuotaColumnDefinitions, printResourceQuota)
h.TableHandler(resourceQuotaColumnDefinitions, printResourceQuotaList)
priorityClassColumnDefinitions := []metav1beta1.TableColumnDefinition{
{Name: "Name", Type: "string", Format: "name", Description: metav1.ObjectMeta{}.SwaggerDoc()["name"]},
@ -456,6 +456,17 @@ func AddHandlers(h printers.PrintHandler) {
h.TableHandler(runtimeClassColumnDefinitions, printRuntimeClass)
h.TableHandler(runtimeClassColumnDefinitions, printRuntimeClassList)
volumeAttachmentColumnDefinitions := []metav1beta1.TableColumnDefinition{
{Name: "Name", Type: "string", Format: "name", Description: metav1.ObjectMeta{}.SwaggerDoc()["name"]},
{Name: "Attacher", Type: "string", Format: "name", Description: storagev1.VolumeAttachmentSpec{}.SwaggerDoc()["attacher"]},
{Name: "PV", Type: "string", Description: storagev1.VolumeAttachmentSource{}.SwaggerDoc()["persistentVolumeName"]},
{Name: "Node", Type: "string", Description: storagev1.VolumeAttachmentSpec{}.SwaggerDoc()["nodeName"]},
{Name: "Attached", Type: "boolean", Description: storagev1.VolumeAttachmentStatus{}.SwaggerDoc()["attached"]},
{Name: "Age", Type: "string", Description: metav1.ObjectMeta{}.SwaggerDoc()["creationTimestamp"]},
}
h.TableHandler(volumeAttachmentColumnDefinitions, printVolumeAttachment)
h.TableHandler(volumeAttachmentColumnDefinitions, printVolumeAttachmentList)
AddDefaultHandlers(h)
}
@ -1996,6 +2007,34 @@ func printRuntimeClassList(list *nodeapi.RuntimeClassList, options printers.Prin
rows := make([]metav1beta1.TableRow, 0, len(list.Items))
for i := range list.Items {
r, err := printRuntimeClass(&list.Items[i], options)
if err != nil {
return nil, err
}
rows = append(rows, r...)
}
return rows, nil
}
func printVolumeAttachment(obj *storage.VolumeAttachment, options printers.PrintOptions) ([]metav1beta1.TableRow, error) {
row := metav1beta1.TableRow{
Object: runtime.RawExtension{Object: obj},
}
name := obj.Name
pvName := ""
if obj.Spec.Source.PersistentVolumeName != nil {
pvName = *obj.Spec.Source.PersistentVolumeName
}
row.Cells = append(row.Cells, name, obj.Spec.Attacher, pvName, obj.Spec.NodeName, obj.Status.Attached, translateTimestampSince(obj.CreationTimestamp))
return []metav1beta1.TableRow{row}, nil
}
func printVolumeAttachmentList(list *storage.VolumeAttachmentList, options printers.PrintOptions) ([]metav1beta1.TableRow, error) {
rows := make([]metav1beta1.TableRow, 0, len(list.Items))
for i := range list.Items {
r, err := printVolumeAttachment(&list.Items[i], options)
if err != nil {
return nil, err
}

View File

@ -18,6 +18,7 @@ package storage
import (
"context"
"fmt"
metav1beta1 "k8s.io/apimachinery/pkg/apis/meta/v1beta1"
"k8s.io/apimachinery/pkg/runtime"
@ -29,5 +30,16 @@ type TableConvertor struct {
}
func (c TableConvertor) ConvertToTable(ctx context.Context, obj runtime.Object, tableOptions runtime.Object) (*metav1beta1.Table, error) {
return c.TablePrinter.PrintTable(obj, printers.PrintOptions{Wide: true})
noHeaders := false
if tableOptions != nil {
switch t := tableOptions.(type) {
case *metav1beta1.TableOptions:
if t != nil {
noHeaders = t.NoHeaders
}
default:
return nil, fmt.Errorf("unrecognized type %T for table options, can't display tabular output", tableOptions)
}
}
return c.TablePrinter.PrintTable(obj, printers.PrintOptions{Wide: true, NoHeaders: noHeaders})
}

View File

@ -7,6 +7,9 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/apis/storage:go_default_library",
"//pkg/printers:go_default_library",
"//pkg/printers/internalversion:go_default_library",
"//pkg/printers/storage:go_default_library",
"//pkg/registry/storage/volumeattachment:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",

View File

@ -25,6 +25,9 @@ import (
genericregistry "k8s.io/apiserver/pkg/registry/generic/registry"
"k8s.io/apiserver/pkg/registry/rest"
storageapi "k8s.io/kubernetes/pkg/apis/storage"
"k8s.io/kubernetes/pkg/printers"
printersinternal "k8s.io/kubernetes/pkg/printers/internalversion"
printerstorage "k8s.io/kubernetes/pkg/printers/storage"
"k8s.io/kubernetes/pkg/registry/storage/volumeattachment"
)
@ -50,6 +53,8 @@ func NewStorage(optsGetter generic.RESTOptionsGetter) *VolumeAttachmentStorage {
UpdateStrategy: volumeattachment.Strategy,
DeleteStrategy: volumeattachment.Strategy,
ReturnDeletedObject: true,
TableConvertor: printerstorage.TableConvertor{TablePrinter: printers.NewTablePrinter().With(printersinternal.AddHandlers)},
}
options := &generic.StoreOptions{RESTOptions: optsGetter}
if err := store.CompleteWithOptions(options); err != nil {

View File

@ -1110,6 +1110,10 @@
"ImportPath": "k8s.io/apimachinery/pkg/apis/meta/v1beta1",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/apis/meta/v1beta1/validation",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/conversion",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"

View File

@ -36,4 +36,11 @@ go_test(
name = "go_default_test",
srcs = ["tableconvertor_test.go"],
embed = [":go_default_library"],
deps = [
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1beta1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library",
"//staging/src/k8s.io/client-go/util/jsonpath:go_default_library",
],
)

View File

@ -76,9 +76,13 @@ type convertor struct {
}
func (c *convertor) ConvertToTable(ctx context.Context, obj runtime.Object, tableOptions runtime.Object) (*metav1beta1.Table, error) {
table := &metav1beta1.Table{
ColumnDefinitions: c.headers,
table := &metav1beta1.Table{}
opt, ok := tableOptions.(*metav1beta1.TableOptions)
noHeaders := ok && opt != nil && opt.NoHeaders
if !noHeaders {
table.ColumnDefinitions = c.headers
}
if m, err := meta.ListAccessor(obj); err == nil {
table.ResourceVersion = m.GetResourceVersion()
table.SelfLink = m.GetSelfLink()

View File

@ -17,10 +17,17 @@ limitations under the License.
package tableconvertor
import (
"context"
"fmt"
"reflect"
"testing"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
metav1beta1 "k8s.io/apimachinery/pkg/apis/meta/v1beta1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/diff"
"k8s.io/client-go/util/jsonpath"
)
func Test_cellForJSONValue(t *testing.T) {
@ -66,3 +73,149 @@ func Test_cellForJSONValue(t *testing.T) {
})
}
}
func Test_convertor_ConvertToTable(t *testing.T) {
type fields struct {
headers []metav1beta1.TableColumnDefinition
additionalColumns []*jsonpath.JSONPath
}
type args struct {
ctx context.Context
obj runtime.Object
tableOptions runtime.Object
}
tests := []struct {
name string
fields fields
args args
want *metav1beta1.Table
wantErr bool
}{
{
name: "Return table for object",
fields: fields{
headers: []metav1beta1.TableColumnDefinition{{Name: "name", Type: "string"}},
},
args: args{
obj: &metav1beta1.PartialObjectMetadata{
ObjectMeta: metav1.ObjectMeta{Name: "blah", CreationTimestamp: metav1.NewTime(time.Unix(1, 0))},
},
tableOptions: nil,
},
want: &metav1beta1.Table{
ColumnDefinitions: []metav1beta1.TableColumnDefinition{{Name: "name", Type: "string"}},
Rows: []metav1beta1.TableRow{
{
Cells: []interface{}{"blah"},
Object: runtime.RawExtension{
Object: &metav1beta1.PartialObjectMetadata{
ObjectMeta: metav1.ObjectMeta{Name: "blah", CreationTimestamp: metav1.NewTime(time.Unix(1, 0))},
},
},
},
},
},
},
{
name: "Return table for list",
fields: fields{
headers: []metav1beta1.TableColumnDefinition{{Name: "name", Type: "string"}},
},
args: args{
obj: &metav1beta1.PartialObjectMetadataList{
Items: []*metav1beta1.PartialObjectMetadata{
{ObjectMeta: metav1.ObjectMeta{Name: "blah", CreationTimestamp: metav1.NewTime(time.Unix(1, 0))}},
{ObjectMeta: metav1.ObjectMeta{Name: "blah-2", CreationTimestamp: metav1.NewTime(time.Unix(2, 0))}},
},
},
tableOptions: nil,
},
want: &metav1beta1.Table{
ColumnDefinitions: []metav1beta1.TableColumnDefinition{{Name: "name", Type: "string"}},
Rows: []metav1beta1.TableRow{
{
Cells: []interface{}{"blah"},
Object: runtime.RawExtension{
Object: &metav1beta1.PartialObjectMetadata{
ObjectMeta: metav1.ObjectMeta{Name: "blah", CreationTimestamp: metav1.NewTime(time.Unix(1, 0))},
},
},
},
{
Cells: []interface{}{"blah-2"},
Object: runtime.RawExtension{
Object: &metav1beta1.PartialObjectMetadata{
ObjectMeta: metav1.ObjectMeta{Name: "blah-2", CreationTimestamp: metav1.NewTime(time.Unix(2, 0))},
},
},
},
},
},
},
{
name: "Accept TableOptions",
fields: fields{
headers: []metav1beta1.TableColumnDefinition{{Name: "name", Type: "string"}},
},
args: args{
obj: &metav1beta1.PartialObjectMetadata{
ObjectMeta: metav1.ObjectMeta{Name: "blah", CreationTimestamp: metav1.NewTime(time.Unix(1, 0))},
},
tableOptions: &metav1beta1.TableOptions{},
},
want: &metav1beta1.Table{
ColumnDefinitions: []metav1beta1.TableColumnDefinition{{Name: "name", Type: "string"}},
Rows: []metav1beta1.TableRow{
{
Cells: []interface{}{"blah"},
Object: runtime.RawExtension{
Object: &metav1beta1.PartialObjectMetadata{
ObjectMeta: metav1.ObjectMeta{Name: "blah", CreationTimestamp: metav1.NewTime(time.Unix(1, 0))},
},
},
},
},
},
},
{
name: "Omit headers from TableOptions",
fields: fields{
headers: []metav1beta1.TableColumnDefinition{{Name: "name", Type: "string"}},
},
args: args{
obj: &metav1beta1.PartialObjectMetadata{
ObjectMeta: metav1.ObjectMeta{Name: "blah", CreationTimestamp: metav1.NewTime(time.Unix(1, 0))},
},
tableOptions: &metav1beta1.TableOptions{NoHeaders: true},
},
want: &metav1beta1.Table{
Rows: []metav1beta1.TableRow{
{
Cells: []interface{}{"blah"},
Object: runtime.RawExtension{
Object: &metav1beta1.PartialObjectMetadata{
ObjectMeta: metav1.ObjectMeta{Name: "blah", CreationTimestamp: metav1.NewTime(time.Unix(1, 0))},
},
},
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := &convertor{
headers: tt.fields.headers,
additionalColumns: tt.fields.additionalColumns,
}
got, err := c.ConvertToTable(tt.args.ctx, tt.args.obj, tt.args.tableOptions)
if (err != nil) != tt.wantErr {
t.Errorf("convertor.ConvertToTable() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("convertor.ConvertToTable() = %s", diff.ObjectReflectDiff(tt.want, got))
}
})
}
}

View File

@ -23,7 +23,7 @@ import (
"strconv"
"strings"
"github.com/google/gofuzz"
fuzz "github.com/google/gofuzz"
apitesting "k8s.io/apimachinery/pkg/api/apitesting"
"k8s.io/apimachinery/pkg/api/apitesting/fuzzer"
@ -282,8 +282,14 @@ func v1FuzzerFuncs(codecs runtimeserializer.CodecFactory) []interface{} {
}
}
func v1alpha1FuzzerFuncs(codecs runtimeserializer.CodecFactory) []interface{} {
func v1beta1FuzzerFuncs(codecs runtimeserializer.CodecFactory) []interface{} {
return []interface{}{
func(r *metav1beta1.TableOptions, c fuzz.Continue) {
c.FuzzNoCustom(r)
// NoHeaders is not serialized to the wire but is allowed within the versioned
// type because we don't use meta internal types in the client and API server.
r.NoHeaders = false
},
func(r *metav1beta1.TableRow, c fuzz.Continue) {
c.Fuzz(&r.Object)
c.Fuzz(&r.Conditions)
@ -326,5 +332,5 @@ func v1alpha1FuzzerFuncs(codecs runtimeserializer.CodecFactory) []interface{} {
var Funcs = fuzzer.MergeFuzzerFuncs(
genericFuzzerFuncs,
v1FuzzerFuncs,
v1alpha1FuzzerFuncs,
v1beta1FuzzerFuncs,
)

View File

@ -34,7 +34,10 @@ filegroup(
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
srcs = [
":package-srcs",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1beta1/validation:all-srcs",
],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -133,6 +133,10 @@ const (
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
type TableOptions struct {
v1.TypeMeta `json:",inline"`
// NoHeaders is only exposed for internal callers.
NoHeaders bool `json:"-"`
// includeObject decides whether to include each object along with its columnar information.
// Specifying "None" will return no object, specifying "Object" will return the full object contents, and
// specifying "Metadata" (the default) will return the object's metadata in the PartialObjectMetadata kind

View File

@ -0,0 +1,27 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["validation.go"],
importmap = "k8s.io/kubernetes/vendor/k8s.io/apimachinery/pkg/apis/meta/v1beta1/validation",
importpath = "k8s.io/apimachinery/pkg/apis/meta/v1beta1/validation",
visibility = ["//visibility:public"],
deps = [
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1beta1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/validation/field:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,33 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package validation
import (
metav1beta1 "k8s.io/apimachinery/pkg/apis/meta/v1beta1"
"k8s.io/apimachinery/pkg/util/validation/field"
)
// ValidateTableOptions returns any invalid flags on TableOptions.
func ValidateTableOptions(opts *metav1beta1.TableOptions) field.ErrorList {
var allErrs field.ErrorList
switch opts.IncludeObject {
case metav1beta1.IncludeMetadata, metav1beta1.IncludeNone, metav1beta1.IncludeObject, "":
default:
allErrs = append(allErrs, field.Invalid(field.NewPath("includeObject"), opts.IncludeObject, "must be 'Metadata', 'Object', 'None', or empty"))
}
return allErrs
}

View File

@ -43,6 +43,8 @@ type TypeMeta struct {
const (
ContentTypeJSON string = "application/json"
ContentTypeYAML string = "application/yaml"
ContentTypeProtobuf string = "application/vnd.kubernetes.protobuf"
)
// RawExtension is used to hold extensions in external versions.

View File

@ -1134,6 +1134,10 @@
"ImportPath": "k8s.io/apimachinery/pkg/apis/meta/v1beta1",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/apis/meta/v1beta1/validation",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/conversion",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"

View File

@ -46,6 +46,7 @@ go_test(
"//staging/src/k8s.io/apiserver/pkg/audit/policy:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/endpoints/filters:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/endpoints/handlers:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/endpoints/handlers/negotiation:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/endpoints/testing:go_default_library",

View File

@ -37,7 +37,7 @@ import (
"testing"
"time"
"github.com/emicklei/go-restful"
restful "github.com/emicklei/go-restful"
fuzzer "k8s.io/apimachinery/pkg/api/apitesting/fuzzer"
apiequality "k8s.io/apimachinery/pkg/api/equality"
apierrs "k8s.io/apimachinery/pkg/api/errors"
@ -51,6 +51,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/runtime/serializer/streaming"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/diff"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
@ -64,6 +65,7 @@ import (
"k8s.io/apiserver/pkg/audit"
auditpolicy "k8s.io/apiserver/pkg/audit/policy"
genericapifilters "k8s.io/apiserver/pkg/endpoints/filters"
"k8s.io/apiserver/pkg/endpoints/handlers/negotiation"
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
"k8s.io/apiserver/pkg/endpoints/request"
genericapitesting "k8s.io/apiserver/pkg/endpoints/testing"
@ -1804,6 +1806,10 @@ func TestGetTable(t *testing.T) {
accept: runtime.ContentTypeJSON + ";as=Table;v=v1;g=meta.k8s.io",
statusCode: http.StatusNotAcceptable,
},
{
accept: runtime.ContentTypeProtobuf + ";as=Table;v=v1beta1;g=meta.k8s.io",
statusCode: http.StatusNotAcceptable,
},
{
item: true,
accept: runtime.ContentTypeJSON + ";as=Table;v=v1beta1;g=meta.k8s.io",
@ -1819,6 +1825,24 @@ func TestGetTable(t *testing.T) {
},
},
},
{
item: true,
accept: strings.Join([]string{
runtime.ContentTypeProtobuf + ";as=Table;v=v1beta1;g=meta.k8s.io",
runtime.ContentTypeJSON + ";as=Table;v=v1beta1;g=meta.k8s.io",
}, ","),
expected: &metav1beta1.Table{
TypeMeta: metav1.TypeMeta{Kind: "Table", APIVersion: "meta.k8s.io/v1beta1"},
ListMeta: metav1.ListMeta{ResourceVersion: "10", SelfLink: "/blah"},
ColumnDefinitions: []metav1beta1.TableColumnDefinition{
{Name: "Name", Type: "string", Format: "name", Description: metaDoc["name"]},
{Name: "Created At", Type: "date", Description: metaDoc["creationTimestamp"]},
},
Rows: []metav1beta1.TableRow{
{Cells: []interface{}{"foo1", now.Time.UTC().Format(time.RFC3339)}, Object: runtime.RawExtension{Raw: encodedBody}},
},
},
},
{
item: true,
accept: runtime.ContentTypeJSON + ";as=Table;v=v1beta1;g=meta.k8s.io",
@ -1918,6 +1942,216 @@ func TestGetTable(t *testing.T) {
}
}
func TestWatchTable(t *testing.T) {
obj := genericapitesting.Simple{
ObjectMeta: metav1.ObjectMeta{Name: "foo1", Namespace: "ns1", ResourceVersion: "10", SelfLink: "/blah", CreationTimestamp: metav1.NewTime(time.Unix(1, 0)), UID: types.UID("abcdef0123")},
Other: "foo",
}
m, err := meta.Accessor(&obj)
if err != nil {
t.Fatal(err)
}
partial := meta.AsPartialObjectMetadata(m)
partial.GetObjectKind().SetGroupVersionKind(metav1beta1.SchemeGroupVersion.WithKind("PartialObjectMetadata"))
encodedBody, err := runtime.Encode(metainternalversion.Codecs.LegacyCodec(metav1beta1.SchemeGroupVersion), partial)
if err != nil {
t.Fatal(err)
}
// the codec includes a trailing newline that is not present during decode
encodedBody = bytes.TrimSpace(encodedBody)
metaDoc := metav1.ObjectMeta{}.SwaggerDoc()
s := metainternalversion.Codecs.SupportedMediaTypes()[0].Serializer
tests := []struct {
accept string
params url.Values
send func(w *watch.FakeWatcher)
expected []*metav1.WatchEvent
contentType string
statusCode int
item bool
}{
{
accept: runtime.ContentTypeJSON + ";as=Table;v=v1;g=meta.k8s.io",
statusCode: http.StatusNotAcceptable,
},
{
accept: runtime.ContentTypeJSON + ";as=Table;v=v1beta1;g=meta.k8s.io",
send: func(w *watch.FakeWatcher) {
w.Add(&obj)
},
expected: []*metav1.WatchEvent{
{
Type: "ADDED",
Object: runtime.RawExtension{
Raw: []byte(strings.TrimSpace(runtime.EncodeOrDie(s, &metav1beta1.Table{
TypeMeta: metav1.TypeMeta{Kind: "Table", APIVersion: "meta.k8s.io/v1beta1"},
ListMeta: metav1.ListMeta{ResourceVersion: "10", SelfLink: "/blah"},
ColumnDefinitions: []metav1beta1.TableColumnDefinition{
{Name: "Name", Type: "string", Format: "name", Description: metaDoc["name"]},
{Name: "Created At", Type: "date", Description: metaDoc["creationTimestamp"]},
},
Rows: []metav1beta1.TableRow{
{Cells: []interface{}{"foo1", time.Unix(1, 0).UTC().Format(time.RFC3339)}, Object: runtime.RawExtension{Raw: encodedBody}},
},
}))),
},
},
},
},
{
accept: runtime.ContentTypeJSON + ";as=Table;v=v1beta1;g=meta.k8s.io",
send: func(w *watch.FakeWatcher) {
w.Add(&obj)
w.Modify(&obj)
},
expected: []*metav1.WatchEvent{
{
Type: "ADDED",
Object: runtime.RawExtension{
Raw: []byte(strings.TrimSpace(runtime.EncodeOrDie(s, &metav1beta1.Table{
TypeMeta: metav1.TypeMeta{Kind: "Table", APIVersion: "meta.k8s.io/v1beta1"},
ListMeta: metav1.ListMeta{ResourceVersion: "10", SelfLink: "/blah"},
ColumnDefinitions: []metav1beta1.TableColumnDefinition{
{Name: "Name", Type: "string", Format: "name", Description: metaDoc["name"]},
{Name: "Created At", Type: "date", Description: metaDoc["creationTimestamp"]},
},
Rows: []metav1beta1.TableRow{
{Cells: []interface{}{"foo1", time.Unix(1, 0).UTC().Format(time.RFC3339)}, Object: runtime.RawExtension{Raw: encodedBody}},
},
}))),
},
},
{
Type: "MODIFIED",
Object: runtime.RawExtension{
Raw: []byte(strings.TrimSpace(runtime.EncodeOrDie(s, &metav1beta1.Table{
TypeMeta: metav1.TypeMeta{Kind: "Table", APIVersion: "meta.k8s.io/v1beta1"},
ListMeta: metav1.ListMeta{ResourceVersion: "10", SelfLink: "/blah"},
Rows: []metav1beta1.TableRow{
{Cells: []interface{}{"foo1", time.Unix(1, 0).UTC().Format(time.RFC3339)}, Object: runtime.RawExtension{Raw: encodedBody}},
},
}))),
},
},
},
},
}
for i, test := range tests {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
storage := map[string]rest.Storage{}
simpleStorage := SimpleRESTStorage{
item: obj,
list: []genericapitesting.Simple{obj},
}
selfLinker := &setTestSelfLinker{
t: t,
expectedSet: "/" + prefix + "/" + testGroupVersion.Group + "/" + testGroupVersion.Version + "/namespaces/default/simple",
namespace: "default",
}
if test.item {
selfLinker.expectedSet += "/id"
selfLinker.name = "id"
}
storage["simple"] = &simpleStorage
handler := handleLinker(storage, selfLinker)
server := httptest.NewServer(handler)
defer server.Close()
var id string
if test.item {
id = "/id"
}
u, err := url.Parse(server.URL + "/" + prefix + "/" + testGroupVersion.Group + "/" + testGroupVersion.Version + "/namespaces/default/simple")
if err != nil {
t.Fatal(err)
}
if test.params == nil {
test.params = url.Values{}
}
if test.item {
test.params["fieldSelector"] = []string{fmt.Sprintf("metadata.name=%s", id)}
}
test.params["watch"] = []string{"1"}
u.RawQuery = test.params.Encode()
req := &http.Request{Method: "GET", URL: u}
req.Header = http.Header{}
req.Header.Set("Accept", test.accept)
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatal(err)
}
if test.statusCode != 0 {
if resp.StatusCode != test.statusCode {
t.Fatalf("%d: unexpected response: %#v", i, resp)
}
obj, _, err := extractBodyObject(resp, unstructured.UnstructuredJSONScheme)
if err != nil {
t.Fatalf("%d: unexpected body read error: %v", i, err)
}
gvk := schema.GroupVersionKind{Version: "v1", Kind: "Status"}
if obj.GetObjectKind().GroupVersionKind() != gvk {
t.Fatalf("%d: unexpected error body: %#v", i, obj)
}
return
}
if resp.StatusCode != http.StatusOK {
t.Fatalf("%d: unexpected response: %#v", i, resp)
}
go func() {
defer simpleStorage.fakeWatch.Stop()
test.send(simpleStorage.fakeWatch)
}()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
t.Fatal(err)
}
t.Logf("Body:\n%s", string(body))
d := watcher(resp.Header.Get("Content-Type"), ioutil.NopCloser(bytes.NewReader(body)))
var actual []*metav1.WatchEvent
for {
var event metav1.WatchEvent
_, _, err := d.Decode(nil, &event)
if err == io.EOF {
break
}
if err != nil {
t.Fatal(err)
}
actual = append(actual, &event)
}
if !reflect.DeepEqual(test.expected, actual) {
for i := range test.expected {
if i >= len(actual) {
break
}
t.Logf("%s", diff.StringDiff(string(test.expected[i].Object.Raw), string(actual[i].Object.Raw)))
}
t.Fatalf("unexpected: %s", diff.ObjectReflectDiff(test.expected, actual))
}
})
}
}
func watcher(mediaType string, r io.ReadCloser) streaming.Decoder {
info, ok := runtime.SerializerInfoForMediaType(metainternalversion.Codecs.SupportedMediaTypes(), mediaType)
if !ok || info.StreamSerializer == nil {
panic(info)
}
streamSerializer := info.StreamSerializer
fr := streamSerializer.Framer.NewFrameReader(r)
d := streaming.NewDecoder(fr, streamSerializer.Serializer)
return d
}
func TestGetPartialObjectMetadata(t *testing.T) {
now := metav1.Time{metav1.Now().Rfc3339Copy().Local()}
storage := map[string]rest.Storage{}
@ -3618,7 +3852,7 @@ func (obj *UnregisteredAPIObject) DeepCopyObject() runtime.Object {
func TestWriteJSONDecodeError(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
responsewriters.WriteObjectNegotiated(codecs, newGroupVersion, w, req, http.StatusOK, &UnregisteredAPIObject{"Undecodable"})
responsewriters.WriteObjectNegotiated(codecs, negotiation.DefaultEndpointRestrictions, newGroupVersion, w, req, http.StatusOK, &UnregisteredAPIObject{"Undecodable"})
}))
defer server.Close()
// Decode error response behavior is dictated by

View File

@ -69,5 +69,5 @@ func (s *APIGroupHandler) handle(req *restful.Request, resp *restful.Response) {
}
func (s *APIGroupHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
responsewriters.WriteObjectNegotiated(s.serializer, schema.GroupVersion{}, w, req, http.StatusOK, &s.group)
responsewriters.WriteObjectNegotiated(s.serializer, negotiation.DefaultEndpointRestrictions, schema.GroupVersion{}, w, req, http.StatusOK, &s.group)
}

View File

@ -72,5 +72,5 @@ func (s *legacyRootAPIHandler) handle(req *restful.Request, resp *restful.Respon
Versions: []string{"v1"},
}
responsewriters.WriteObjectNegotiated(s.serializer, schema.GroupVersion{}, resp.ResponseWriter, req.Request, http.StatusOK, apiVersions)
responsewriters.WriteObjectNegotiated(s.serializer, negotiation.DefaultEndpointRestrictions, schema.GroupVersion{}, resp.ResponseWriter, req.Request, http.StatusOK, apiVersions)
}

View File

@ -111,7 +111,7 @@ func (s *rootAPIsHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request)
groups[i].ServerAddressByClientCIDRs = serverCIDR
}
responsewriters.WriteObjectNegotiated(s.serializer, schema.GroupVersion{}, resp, req, http.StatusOK, &metav1.APIGroupList{Groups: groups})
responsewriters.WriteObjectNegotiated(s.serializer, negotiation.DefaultEndpointRestrictions, schema.GroupVersion{}, resp, req, http.StatusOK, &metav1.APIGroupList{Groups: groups})
}
func (s *rootAPIsHandler) restfulHandle(req *restful.Request, resp *restful.Response) {

View File

@ -78,6 +78,6 @@ func (s *APIVersionHandler) handle(req *restful.Request, resp *restful.Response)
}
func (s *APIVersionHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
responsewriters.WriteObjectNegotiated(s.serializer, schema.GroupVersion{}, w, req, http.StatusOK,
responsewriters.WriteObjectNegotiated(s.serializer, negotiation.DefaultEndpointRestrictions, schema.GroupVersion{}, w, req, http.StatusOK,
&metav1.APIResourceList{GroupVersion: s.groupVersion.String(), APIResources: s.apiResourceLister.ListAPIResources()})
}

View File

@ -61,6 +61,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/validation:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1beta1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1beta1/validation:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",

View File

@ -257,7 +257,7 @@ func ListResource(r rest.Lister, rw rest.Watcher, scope RequestScope, forceWatch
}
requestInfo, _ := request.RequestInfoFrom(ctx)
metrics.RecordLongRunning(req, requestInfo, metrics.APIServerComponent, func() {
serveWatch(watcher, scope, req, w, timeout)
serveWatch(watcher, scope, outputMediaType, req, w, timeout)
})
return
}

View File

@ -47,6 +47,34 @@ func (e errNotAcceptable) Status() metav1.Status {
}
}
// errNotAcceptableConversion indicates Accept negotiation has failed specifically
// for a conversion to a known type.
type errNotAcceptableConversion struct {
target string
accepted []string
}
// NewNotAcceptableConversionError returns an error indicating that the desired
// API transformation to the target group version kind string is not accepted and
// only the listed mime types are allowed. This is temporary while Table does not
// yet support protobuf encoding.
func NewNotAcceptableConversionError(target string, accepted []string) error {
return errNotAcceptableConversion{target, accepted}
}
func (e errNotAcceptableConversion) Error() string {
return fmt.Sprintf("only the following media types are accepted when converting to %s: %v", e.target, strings.Join(e.accepted, ", "))
}
func (e errNotAcceptableConversion) Status() metav1.Status {
return metav1.Status{
Status: metav1.StatusFailure,
Code: http.StatusNotAcceptable,
Reason: metav1.StatusReasonNotAcceptable,
Message: e.Error(),
}
}
// errUnsupportedMediaType indicates Content-Type is not recognized
type errUnsupportedMediaType struct {
accepted []string

View File

@ -56,15 +56,9 @@ func NegotiateOutputMediaType(req *http.Request, ns runtime.NegotiatedSerializer
return mediaType, info, nil
}
// NegotiateOutputSerializer returns a serializer for the output.
func NegotiateOutputSerializer(req *http.Request, ns runtime.NegotiatedSerializer) (runtime.SerializerInfo, error) {
_, info, err := NegotiateOutputMediaType(req, ns, DefaultEndpointRestrictions)
return info, err
}
// NegotiateOutputStreamSerializer returns a stream serializer for the given request.
func NegotiateOutputStreamSerializer(req *http.Request, ns runtime.NegotiatedSerializer) (runtime.SerializerInfo, error) {
mediaType, ok := NegotiateMediaTypeOptions(req.Header.Get("Accept"), AcceptedMediaTypesForEndpoint(ns), DefaultEndpointRestrictions)
// NegotiateOutputMediaTypeStream returns a stream serializer for the given request.
func NegotiateOutputMediaTypeStream(req *http.Request, ns runtime.NegotiatedSerializer, restrictions EndpointRestrictions) (runtime.SerializerInfo, error) {
mediaType, ok := NegotiateMediaTypeOptions(req.Header.Get("Accept"), AcceptedMediaTypesForEndpoint(ns), restrictions)
if !ok || mediaType.Accepted.Serializer.StreamSerializer == nil {
_, supported := MediaTypesForSerializer(ns)
return runtime.SerializerInfo{}, NewNotAcceptableError(supported)
@ -124,7 +118,7 @@ func isPrettyPrint(req *http.Request) bool {
type EndpointRestrictions interface {
// AllowsConversion should return true if the specified group version kind
// is an allowed target object.
AllowsConversion(schema.GroupVersionKind) bool
AllowsConversion(target schema.GroupVersionKind, mimeType, mimeSubType string) bool
// AllowsServerVersion should return true if the specified version is valid
// for the server group.
AllowsServerVersion(version string) bool
@ -139,9 +133,11 @@ var DefaultEndpointRestrictions = emptyEndpointRestrictions{}
type emptyEndpointRestrictions struct{}
func (emptyEndpointRestrictions) AllowsConversion(schema.GroupVersionKind) bool { return false }
func (emptyEndpointRestrictions) AllowsServerVersion(string) bool { return false }
func (emptyEndpointRestrictions) AllowsStreamSchema(s string) bool { return s == "watch" }
func (emptyEndpointRestrictions) AllowsConversion(schema.GroupVersionKind, string, string) bool {
return false
}
func (emptyEndpointRestrictions) AllowsServerVersion(string) bool { return false }
func (emptyEndpointRestrictions) AllowsStreamSchema(s string) bool { return s == "watch" }
// AcceptedMediaType contains information about a valid media type that the
// server can serialize.
@ -240,7 +236,7 @@ func acceptMediaTypeOptions(params map[string]string, accepts *AcceptedMediaType
}
}
if options.Convert != nil && !endpoint.AllowsConversion(*options.Convert) {
if options.Convert != nil && !endpoint.AllowsConversion(*options.Convert, accepts.Type, accepts.SubType) {
return MediaTypeOptions{}, false
}

View File

@ -231,7 +231,7 @@ func TestNegotiate(t *testing.T) {
req = &http.Request{Header: http.Header{}}
req.Header.Set("Accept", test.accept)
}
s, err := NegotiateOutputSerializer(req, test.ns)
_, s, err := NegotiateOutputMediaType(req, test.ns, DefaultEndpointRestrictions)
switch {
case err == nil && test.errFn != nil:
t.Errorf("%d: failed: expected error", i)

View File

@ -26,86 +26,97 @@ import (
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
metav1beta1 "k8s.io/apimachinery/pkg/apis/meta/v1beta1"
"k8s.io/apimachinery/pkg/apis/meta/v1beta1/validation"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/endpoints/handlers/negotiation"
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
)
// transformObject takes the object as returned by storage and ensures it is in
// the client's desired form, as well as ensuring any API level fields like self-link
// are properly set.
func transformObject(ctx context.Context, obj runtime.Object, opts interface{}, mediaType negotiation.MediaTypeOptions, scope RequestScope, req *http.Request) (runtime.Object, error) {
if _, ok := obj.(*metav1.Status); ok {
return obj, nil
}
if err := setObjectSelfLink(ctx, obj, req, scope.Namer); err != nil {
return nil, err
}
switch target := mediaType.Convert; {
case target == nil:
return obj, nil
case target.Kind == "PartialObjectMetadata" && target.GroupVersion() == metav1beta1.SchemeGroupVersion:
return asV1Beta1PartialObjectMetadata(obj)
case target.Kind == "PartialObjectMetadataList" && target.GroupVersion() == metav1beta1.SchemeGroupVersion:
return asV1Beta1PartialObjectMetadataList(obj)
case target.Kind == "Table" && target.GroupVersion() == metav1beta1.SchemeGroupVersion:
options, ok := opts.(*metav1beta1.TableOptions)
if !ok {
return nil, fmt.Errorf("unexpected TableOptions, got %T", opts)
}
return asV1Beta1Table(ctx, obj, options, scope)
default:
accepted, _ := negotiation.MediaTypesForSerializer(metainternalversion.Codecs)
err := negotiation.NewNotAcceptableError(accepted)
return nil, err
}
}
// optionsForTransform will load and validate any additional query parameter options for
// a conversion or return an error.
func optionsForTransform(mediaType negotiation.MediaTypeOptions, req *http.Request) (interface{}, error) {
switch target := mediaType.Convert; {
case target == nil:
case target.Kind == "Table" && target.GroupVersion() == metav1beta1.SchemeGroupVersion:
opts := &metav1beta1.TableOptions{}
if err := metav1beta1.ParameterCodec.DecodeParameters(req.URL.Query(), metav1beta1.SchemeGroupVersion, opts); err != nil {
return nil, err
}
switch errs := validation.ValidateTableOptions(opts); len(errs) {
case 0:
return opts, nil
case 1:
return nil, errors.NewBadRequest(fmt.Sprintf("Unable to convert to Table as requested: %v", errs[0].Error()))
default:
return nil, errors.NewBadRequest(fmt.Sprintf("Unable to convert to Table as requested: %v", errs))
}
}
return nil, nil
}
// targetEncodingForTransform returns the appropriate serializer for the input media type
func targetEncodingForTransform(scope *RequestScope, mediaType negotiation.MediaTypeOptions, req *http.Request) (schema.GroupVersionKind, runtime.NegotiatedSerializer, bool) {
switch target := mediaType.Convert; {
case target == nil:
case target.Kind == "PartialObjectMetadata" && target.GroupVersion() == metav1beta1.SchemeGroupVersion,
target.Kind == "PartialObjectMetadataList" && target.GroupVersion() == metav1beta1.SchemeGroupVersion,
target.Kind == "Table" && target.GroupVersion() == metav1beta1.SchemeGroupVersion:
return *target, metainternalversion.Codecs, true
}
return scope.Kind, scope.Serializer, false
}
// transformResponseObject takes an object loaded from storage and performs any necessary transformations.
// Will write the complete response object.
func transformResponseObject(ctx context.Context, scope RequestScope, req *http.Request, w http.ResponseWriter, statusCode int, mediaType negotiation.MediaTypeOptions, result runtime.Object) {
// status objects are ignored for transformation
if _, ok := result.(*metav1.Status); ok {
responsewriters.WriteObject(statusCode, scope.Kind.GroupVersion(), scope.Serializer, result, w, req)
return
}
// ensure the self link and empty list array are set
if err := setObjectSelfLink(ctx, result, req, scope.Namer); err != nil {
options, err := optionsForTransform(mediaType, req)
if err != nil {
scope.err(err, w, req)
return
}
trace := scope.Trace
// If conversion was allowed by the scope, perform it before writing the response
switch target := mediaType.Convert; {
case target == nil:
trace.Step("Writing response")
responsewriters.WriteObject(statusCode, scope.Kind.GroupVersion(), scope.Serializer, result, w, req)
case target.Kind == "PartialObjectMetadata" && target.GroupVersion() == metav1beta1.SchemeGroupVersion:
partial, err := asV1Beta1PartialObjectMetadata(result)
if err != nil {
scope.err(err, w, req)
return
}
if err := writeMetaInternalVersion(partial, statusCode, w, req, &scope, target.GroupVersion()); err != nil {
scope.err(err, w, req)
return
}
case target.Kind == "PartialObjectMetadataList" && target.GroupVersion() == metav1beta1.SchemeGroupVersion:
trace.Step("Processing list items")
partial, err := asV1Beta1PartialObjectMetadataList(result)
if err != nil {
scope.err(err, w, req)
return
}
if err := writeMetaInternalVersion(partial, statusCode, w, req, &scope, target.GroupVersion()); err != nil {
scope.err(err, w, req)
return
}
case target.Kind == "Table" && target.GroupVersion() == metav1beta1.SchemeGroupVersion:
opts := &metav1beta1.TableOptions{}
trace.Step("Decoding parameters")
if err := metav1beta1.ParameterCodec.DecodeParameters(req.URL.Query(), metav1beta1.SchemeGroupVersion, opts); err != nil {
scope.err(err, w, req)
return
}
table, err := asV1Beta1Table(ctx, result, opts, scope)
if err != nil {
scope.err(err, w, req)
return
}
if err := writeMetaInternalVersion(table, statusCode, w, req, &scope, target.GroupVersion()); err != nil {
scope.err(err, w, req)
return
}
default:
// this block should only be hit if scope AllowsConversion is incorrect
accepted, _ := negotiation.MediaTypesForSerializer(metainternalversion.Codecs)
err := negotiation.NewNotAcceptableError(accepted)
obj, err := transformObject(ctx, result, options, mediaType, scope, req)
if err != nil {
scope.err(err, w, req)
return
}
kind, serializer, _ := targetEncodingForTransform(&scope, mediaType, req)
responsewriters.WriteObjectNegotiated(serializer, &scope, kind.GroupVersion(), w, req, statusCode, obj)
}
// errNotAcceptable indicates Accept negotiation has failed
@ -131,15 +142,11 @@ func (e errNotAcceptable) Status() metav1.Status {
}
func asV1Beta1Table(ctx context.Context, result runtime.Object, opts *metav1beta1.TableOptions, scope RequestScope) (runtime.Object, error) {
trace := scope.Trace
trace.Step("Converting to table")
table, err := scope.TableConvertor.ConvertToTable(ctx, result, opts)
if err != nil {
return nil, err
}
trace.Step("Processing rows")
for i := range table.Rows {
item := &table.Rows[i]
switch opts.IncludeObject {
@ -161,7 +168,6 @@ func asV1Beta1Table(ctx context.Context, result runtime.Object, opts *metav1beta
case metav1beta1.IncludeNone:
item.Object.Object = nil
default:
// TODO: move this to validation on the table options?
err = errors.NewBadRequest(fmt.Sprintf("unrecognized includeObject value: %q", opts.IncludeObject))
return nil, err
}
@ -172,7 +178,6 @@ func asV1Beta1Table(ctx context.Context, result runtime.Object, opts *metav1beta
func asV1Beta1PartialObjectMetadata(result runtime.Object) (runtime.Object, error) {
if meta.IsListType(result) {
// TODO: this should be calculated earlier
err := newNotAcceptableError(fmt.Sprintf("you requested PartialObjectMetadata, but the requested object is a list (%T)", result))
return nil, err
}
@ -187,7 +192,6 @@ func asV1Beta1PartialObjectMetadata(result runtime.Object) (runtime.Object, erro
func asV1Beta1PartialObjectMetadataList(result runtime.Object) (runtime.Object, error) {
if !meta.IsListType(result) {
// TODO: this should be calculated earlier
return nil, newNotAcceptableError(fmt.Sprintf("you requested PartialObjectMetadataList, but the requested object is not a list (%T)", result))
}
list := &metav1beta1.PartialObjectMetadataList{}
@ -206,14 +210,3 @@ func asV1Beta1PartialObjectMetadataList(result runtime.Object) (runtime.Object,
}
return list, nil
}
func writeMetaInternalVersion(obj runtime.Object, statusCode int, w http.ResponseWriter, req *http.Request, restrictions negotiation.EndpointRestrictions, target schema.GroupVersion) error {
// renegotiate under the internal version
_, info, err := negotiation.NegotiateOutputMediaType(req, metainternalversion.Codecs, restrictions)
if err != nil {
return err
}
encoder := metainternalversion.Codecs.EncoderForVersion(info.Serializer, target)
responsewriters.SerializeObject(info.MediaType, encoder, w, req, statusCode, obj)
return nil
}

View File

@ -55,23 +55,6 @@ func (w httpResponseWriterWithInit) Write(b []byte) (n int, err error) {
return w.innerW.Write(b)
}
// WriteObject renders a returned runtime.Object to the response as a stream or an encoded object. If the object
// returned by the response implements rest.ResourceStreamer that interface will be used to render the
// response. The Accept header and current API version will be passed in, and the output will be copied
// directly to the response body. If content type is returned it is used, otherwise the content type will
// be "application/octet-stream". All other objects are sent to standard JSON serialization.
func WriteObject(statusCode int, gv schema.GroupVersion, s runtime.NegotiatedSerializer, object runtime.Object, w http.ResponseWriter, req *http.Request) {
stream, ok := object.(rest.ResourceStreamer)
if ok {
requestInfo, _ := request.RequestInfoFrom(req.Context())
metrics.RecordLongRunning(req, requestInfo, metrics.APIServerComponent, func() {
StreamObject(statusCode, gv, s, stream, w, req)
})
return
}
WriteObjectNegotiated(s, gv, w, req, statusCode, object)
}
// StreamObject performs input stream negotiation from a ResourceStreamer and writes that to the response.
// If the client requests a websocket upgrade, negotiate for a websocket reader protocol (because many
// browser clients cannot easily handle binary streaming protocols).
@ -123,9 +106,17 @@ func SerializeObject(mediaType string, encoder runtime.Encoder, innerW http.Resp
}
// WriteObjectNegotiated renders an object in the content type negotiated by the client.
// The context is optional and can be nil.
func WriteObjectNegotiated(s runtime.NegotiatedSerializer, gv schema.GroupVersion, w http.ResponseWriter, req *http.Request, statusCode int, object runtime.Object) {
serializer, err := negotiation.NegotiateOutputSerializer(req, s)
func WriteObjectNegotiated(s runtime.NegotiatedSerializer, restrictions negotiation.EndpointRestrictions, gv schema.GroupVersion, w http.ResponseWriter, req *http.Request, statusCode int, object runtime.Object) {
stream, ok := object.(rest.ResourceStreamer)
if ok {
requestInfo, _ := request.RequestInfoFrom(req.Context())
metrics.RecordLongRunning(req, requestInfo, metrics.APIServerComponent, func() {
StreamObject(statusCode, gv, s, stream, w, req)
})
return
}
_, serializer, err := negotiation.NegotiateOutputMediaType(req, s, restrictions)
if err != nil {
// if original statusCode was not successful we need to return the original error
// we cannot hide it behind negotiation problems
@ -162,7 +153,7 @@ func ErrorNegotiated(err error, s runtime.NegotiatedSerializer, gv schema.GroupV
return code
}
WriteObjectNegotiated(s, gv, w, req, code, status)
WriteObjectNegotiated(s, negotiation.DefaultEndpointRestrictions, gv, w, req, code, status)
return code
}

View File

@ -79,12 +79,14 @@ func (scope *RequestScope) err(err error, w http.ResponseWriter, req *http.Reque
responsewriters.ErrorNegotiated(err, scope.Serializer, scope.Kind.GroupVersion(), w, req)
}
func (scope *RequestScope) AllowsConversion(gvk schema.GroupVersionKind) bool {
func (scope *RequestScope) AllowsConversion(gvk schema.GroupVersionKind, mimeType, mimeSubType string) bool {
// TODO: this is temporary, replace with an abstraction calculated at endpoint installation time
if gvk.GroupVersion() == metav1beta1.SchemeGroupVersion {
switch gvk.Kind {
case "Table":
return scope.TableConvertor != nil
return scope.TableConvertor != nil &&
mimeType == "application" &&
(mimeSubType == "json" || mimeSubType == "yaml")
case "PartialObjectMetadata", "PartialObjectMetadataList":
// TODO: should delineate between lists and non-list endpoints
return true
@ -172,7 +174,7 @@ type responder struct {
}
func (r *responder) Object(statusCode int, obj runtime.Object) {
responsewriters.WriteObject(statusCode, r.scope.Kind.GroupVersion(), r.scope.Serializer, obj, r.w, r.req)
responsewriters.WriteObjectNegotiated(r.scope.Serializer, &r.scope, r.scope.Kind.GroupVersion(), r.w, r.req, statusCode, obj)
}
func (r *responder) Error(err error) {

View File

@ -25,13 +25,13 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
metav1beta1 "k8s.io/apimachinery/pkg/apis/meta/v1beta1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer/streaming"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/endpoints/handlers/negotiation"
"k8s.io/apiserver/pkg/endpoints/metrics"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/server/httplog"
"k8s.io/apiserver/pkg/util/wsstream"
@ -61,42 +61,56 @@ func (w *realTimeoutFactory) TimeoutCh() (<-chan time.Time, func() bool) {
return t.C, t.Stop
}
// serveWatch handles serving requests to the server
// serveWatch will serve a watch response.
// TODO: the functionality in this method and in WatchServer.Serve is not cleanly decoupled.
func serveWatch(watcher watch.Interface, scope RequestScope, req *http.Request, w http.ResponseWriter, timeout time.Duration) {
// negotiate for the stream serializer
serializer, err := negotiation.NegotiateOutputStreamSerializer(req, scope.Serializer)
func serveWatch(watcher watch.Interface, scope RequestScope, mediaTypeOptions negotiation.MediaTypeOptions, req *http.Request, w http.ResponseWriter, timeout time.Duration) {
options, err := optionsForTransform(mediaTypeOptions, req)
if err != nil {
scope.err(err, w, req)
return
}
// negotiate for the stream serializer from the scope's serializer
serializer, err := negotiation.NegotiateOutputMediaTypeStream(req, scope.Serializer, &scope)
if err != nil {
scope.err(err, w, req)
return
}
framer := serializer.StreamSerializer.Framer
streamSerializer := serializer.StreamSerializer.Serializer
embedded := serializer.Serializer
encoder := scope.Serializer.EncoderForVersion(streamSerializer, scope.Kind.GroupVersion())
useTextFraming := serializer.EncodesAsText
if framer == nil {
scope.err(fmt.Errorf("no framer defined for %q available for embedded encoding", serializer.MediaType), w, req)
return
}
encoder := scope.Serializer.EncoderForVersion(streamSerializer, scope.Kind.GroupVersion())
useTextFraming := serializer.EncodesAsText
// find the embedded serializer matching the media type
embeddedEncoder := scope.Serializer.EncoderForVersion(embedded, scope.Kind.GroupVersion())
// TODO: next step, get back mediaTypeOptions from negotiate and return the exact value here
mediaType := serializer.MediaType
if mediaType != runtime.ContentTypeJSON {
mediaType += ";stream=watch"
}
ctx := req.Context()
requestInfo, ok := request.RequestInfoFrom(ctx)
if !ok {
scope.err(fmt.Errorf("missing requestInfo"), w, req)
return
// locate the appropriate embedded encoder based on the transform
var embeddedEncoder runtime.Encoder
contentKind, contentSerializer, transform := targetEncodingForTransform(&scope, mediaTypeOptions, req)
if transform {
var embedded runtime.Serializer
for _, supported := range contentSerializer.SupportedMediaTypes() {
if supported.MediaType == serializer.MediaType {
embedded = supported.Serializer
}
}
if embedded == nil {
scope.err(fmt.Errorf("no encoder for %q exists in the requested target %#v", serializer.MediaType, contentSerializer), w, req)
return
}
embeddedEncoder = contentSerializer.EncoderForVersion(embedded, contentKind.GroupVersion())
} else {
embeddedEncoder = scope.Serializer.EncoderForVersion(serializer.Serializer, contentKind.GroupVersion())
}
ctx := req.Context()
server := &WatchServer{
Watching: watcher,
Scope: scope,
@ -106,10 +120,20 @@ func serveWatch(watcher watch.Interface, scope RequestScope, req *http.Request,
Framer: framer,
Encoder: encoder,
EmbeddedEncoder: embeddedEncoder,
Fixup: func(obj runtime.Object) {
if err := setSelfLink(obj, requestInfo, scope.Namer); err != nil {
utilruntime.HandleError(fmt.Errorf("failed to set link for object %v: %v", reflect.TypeOf(obj), err))
Fixup: func(obj runtime.Object) runtime.Object {
result, err := transformObject(ctx, obj, options, mediaTypeOptions, scope, req)
if err != nil {
utilruntime.HandleError(fmt.Errorf("failed to transform object %v: %v", reflect.TypeOf(obj), err))
return obj
}
// When we are transformed to a table, use the table options as the state for whether we
// should print headers - on watch, we only want to print table headers on the first object
// and omit them on subsequent events.
if tableOptions, ok := options.(*metav1beta1.TableOptions); ok {
tableOptions.NoHeaders = true
}
return result
},
TimeoutFactory: &realTimeoutFactory{timeout},
@ -133,7 +157,8 @@ type WatchServer struct {
Encoder runtime.Encoder
// used to encode the nested object in the watch stream
EmbeddedEncoder runtime.Encoder
Fixup func(runtime.Object)
// used to correct the object before we send it to the serializer
Fixup func(runtime.Object) runtime.Object
TimeoutFactory TimeoutFactory
}
@ -205,8 +230,7 @@ func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
return
}
obj := event.Object
s.Fixup(obj)
obj := s.Fixup(event.Object)
if err := s.EmbeddedEncoder.Encode(obj, buf); err != nil {
// unexpected error
utilruntime.HandleError(fmt.Errorf("unable to encode watch object %T: %v", obj, err))
@ -272,8 +296,7 @@ func (s *WatchServer) HandleWS(ws *websocket.Conn) {
// End of results.
return
}
obj := event.Object
s.Fixup(obj)
obj := s.Fixup(event.Object)
if err := s.EmbeddedEncoder.Encode(obj, buf); err != nil {
// unexpected error
utilruntime.HandleError(fmt.Errorf("unable to encode watch object %T: %v", obj, err))

View File

@ -586,7 +586,7 @@ func TestWatchHTTPErrors(t *testing.T) {
Encoder: newCodec,
EmbeddedEncoder: newCodec,
Fixup: func(obj runtime.Object) {},
Fixup: func(obj runtime.Object) runtime.Object { return obj },
TimeoutFactory: &fakeTimeoutFactory{timeoutCh, done},
}
@ -646,7 +646,7 @@ func TestWatchHTTPDynamicClientErrors(t *testing.T) {
Encoder: newCodec,
EmbeddedEncoder: newCodec,
Fixup: func(obj runtime.Object) {},
Fixup: func(obj runtime.Object) runtime.Object { return obj },
TimeoutFactory: &fakeTimeoutFactory{timeoutCh, done},
}
@ -708,7 +708,7 @@ func TestWatchHTTPTimeout(t *testing.T) {
Encoder: newCodec,
EmbeddedEncoder: newCodec,
Fixup: func(obj runtime.Object) {},
Fixup: func(obj runtime.Object) runtime.Object { return obj },
TimeoutFactory: &fakeTimeoutFactory{timeoutCh, done},
}

View File

@ -18,6 +18,7 @@ package resttest
import (
"context"
"encoding/json"
"fmt"
"reflect"
"strings"
@ -1460,7 +1461,7 @@ func (t *Tester) testListTableConversion(obj runtime.Object, assignFn AssignFunc
}
columns := table.ColumnDefinitions
if len(columns) == 0 {
t.Errorf("unexpected number of columns: %v", len(columns))
t.Fatalf("unexpected number of columns: %v\n%#v", len(columns), columns)
}
if !strings.EqualFold(columns[0].Name, "Name") || columns[0].Type != "string" || columns[0].Format != "name" {
t.Errorf("expect column 0 to be the name column: %#v", columns[0])
@ -1505,8 +1506,11 @@ func (t *Tester) testListTableConversion(obj runtime.Object, assignFn AssignFunc
}
}
if len(row.Cells) != len(table.ColumnDefinitions) {
t.Fatalf("unmatched row length on row %d: %#v", i, row.Cells)
}
}
data, _ := json.MarshalIndent(table, "", " ")
t.Logf("%s", string(data))
}
// =============================================================================

View File

@ -73,9 +73,11 @@ func (c defaultTableConvertor) ConvertToTable(ctx context.Context, object runtim
table.SelfLink = m.GetSelfLink()
}
}
table.ColumnDefinitions = []metav1beta1.TableColumnDefinition{
{Name: "Name", Type: "string", Format: "name", Description: swaggerMetadataDescriptions["name"]},
{Name: "Created At", Type: "date", Description: swaggerMetadataDescriptions["creationTimestamp"]},
if opt, ok := tableOptions.(*metav1beta1.TableOptions); !ok || !opt.NoHeaders {
table.ColumnDefinitions = []metav1beta1.TableColumnDefinition{
{Name: "Name", Type: "string", Format: "name", Description: swaggerMetadataDescriptions["name"]},
{Name: "Created At", Type: "date", Description: swaggerMetadataDescriptions["creationTimestamp"]},
}
}
return &table, nil
}

View File

@ -36,6 +36,19 @@ type dynamicClient struct {
var _ Interface = &dynamicClient{}
// ConfigFor returns a copy of the provided config with the
// appropriate dynamic client defaults set.
func ConfigFor(inConfig *rest.Config) *rest.Config {
config := rest.CopyConfig(inConfig)
config.AcceptContentTypes = "application/json"
config.ContentType = "application/json"
config.NegotiatedSerializer = basicNegotiatedSerializer{} // this gets used for discovery and error handling types
if config.UserAgent == "" {
config.UserAgent = rest.DefaultKubernetesUserAgent()
}
return config
}
// NewForConfigOrDie creates a new Interface for the given config and
// panics if there is an error in the config.
func NewForConfigOrDie(c *rest.Config) Interface {
@ -46,17 +59,12 @@ func NewForConfigOrDie(c *rest.Config) Interface {
return ret
}
// NewForConfig creates a new dynamic client or returns an error.
func NewForConfig(inConfig *rest.Config) (Interface, error) {
config := rest.CopyConfig(inConfig)
config := ConfigFor(inConfig)
// for serializing the options
config.GroupVersion = &schema.GroupVersion{}
config.APIPath = "/if-you-see-this-search-for-the-break"
config.AcceptContentTypes = "application/json"
config.ContentType = "application/json"
config.NegotiatedSerializer = basicNegotiatedSerializer{} // this gets used for discovery and error handling types
if config.UserAgent == "" {
config.UserAgent = rest.DefaultKubernetesUserAgent()
}
restClient, err := rest.RESTClientFor(config)
if err != nil {

View File

@ -686,6 +686,10 @@
"ImportPath": "k8s.io/apimachinery/pkg/apis/meta/v1beta1",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/apis/meta/v1beta1/validation",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/conversion",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"

View File

@ -54,6 +54,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/endpoints/handlers/negotiation:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/endpoints/metrics:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/endpoints/request:go_default_library",

View File

@ -24,6 +24,7 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apiserver/pkg/endpoints/handlers/negotiation"
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
apiregistrationapi "k8s.io/kube-aggregator/pkg/apis/apiregistration"
@ -81,7 +82,7 @@ func (r *apisHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
}
}
responsewriters.WriteObjectNegotiated(r.codecs, schema.GroupVersion{}, w, req, http.StatusOK, discoveryGroupList)
responsewriters.WriteObjectNegotiated(r.codecs, negotiation.DefaultEndpointRestrictions, schema.GroupVersion{}, w, req, http.StatusOK, discoveryGroupList)
}
// convertToDiscoveryAPIGroup takes apiservices in a single group and returns a discovery compatible object.
@ -152,5 +153,5 @@ func (r *apiGroupHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
http.Error(w, "", http.StatusNotFound)
return
}
responsewriters.WriteObjectNegotiated(r.codecs, schema.GroupVersion{}, w, req, http.StatusOK, discoveryGroup)
responsewriters.WriteObjectNegotiated(r.codecs, negotiation.DefaultEndpointRestrictions, schema.GroupVersion{}, w, req, http.StatusOK, discoveryGroup)
}

View File

@ -670,6 +670,10 @@
"ImportPath": "k8s.io/apimachinery/pkg/apis/meta/v1beta1",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/apis/meta/v1beta1/validation",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
},
{
"ImportPath": "k8s.io/apimachinery/pkg/conversion",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"

View File

@ -39,18 +39,27 @@ go_test(
"//staging/src/k8s.io/api/scheduling/v1beta1:go_default_library",
"//staging/src/k8s.io/api/settings/v1alpha1:go_default_library",
"//staging/src/k8s.io/api/storage/v1alpha1:go_default_library",
"//staging/src/k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1:go_default_library",
"//staging/src/k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset:go_default_library",
"//staging/src/k8s.io/apiextensions-apiserver/test/integration/fixtures:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/meta:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/internalversion:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1beta1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/serializer/streaming:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/features:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature/testing:go_default_library",
"//staging/src/k8s.io/cli-runtime/pkg/genericclioptions:go_default_library",
"//staging/src/k8s.io/client-go/discovery/cached/disk:go_default_library",
"//staging/src/k8s.io/client-go/dynamic:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/rest:go_default_library",
"//staging/src/k8s.io/client-go/tools/clientcmd:go_default_library",

View File

@ -19,26 +19,43 @@ package apiserver
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/http/httptest"
"path"
"reflect"
"strconv"
"strings"
"testing"
apps "k8s.io/api/apps/v1"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
"k8s.io/apiextensions-apiserver/test/integration/fixtures"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
metav1beta1 "k8s.io/apimachinery/pkg/apis/meta/v1beta1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer/protobuf"
"k8s.io/apimachinery/pkg/runtime/serializer/streaming"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apiserver/pkg/features"
utilfeature "k8s.io/apiserver/pkg/util/feature"
utilfeaturetesting "k8s.io/apiserver/pkg/util/feature/testing"
"k8s.io/client-go/dynamic"
clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/pager"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/master"
"k8s.io/kubernetes/test/integration/framework"
@ -322,3 +339,456 @@ func TestNameInFieldSelector(t *testing.T) {
}
}
}
func TestTransformOnWatch(t *testing.T) {
tearDown, config, _, err := fixtures.StartDefaultServer(t)
if err != nil {
t.Fatal(err)
}
defer tearDown()
s, clientset, closeFn := setup(t)
defer closeFn()
apiExtensionClient, err := apiextensionsclient.NewForConfig(config)
if err != nil {
t.Fatal(err)
}
dynamicClient, err := dynamic.NewForConfig(config)
if err != nil {
t.Fatal(err)
}
fooCRD := &apiextensionsv1beta1.CustomResourceDefinition{
ObjectMeta: metav1.ObjectMeta{
Name: "foos.cr.bar.com",
},
Spec: apiextensionsv1beta1.CustomResourceDefinitionSpec{
Group: "cr.bar.com",
Version: "v1",
Scope: apiextensionsv1beta1.NamespaceScoped,
Names: apiextensionsv1beta1.CustomResourceDefinitionNames{
Plural: "foos",
Kind: "Foo",
},
},
}
fooCRD, err = fixtures.CreateNewCustomResourceDefinition(fooCRD, apiExtensionClient, dynamicClient)
if err != nil {
t.Fatal(err)
}
crdGVR := schema.GroupVersionResource{Group: fooCRD.Spec.Group, Version: fooCRD.Spec.Version, Resource: "foos"}
crclient := dynamicClient.Resource(crdGVR).Namespace("default")
testcases := []struct {
name string
accept string
includeObject metav1beta1.IncludeObjectPolicy
object func(*testing.T) (metav1.Object, string, string)
wantErr func(*testing.T, error)
wantBody func(*testing.T, io.Reader)
}{
{
name: "verify columns on cluster scoped resources",
accept: "application/json;as=Table;g=meta.k8s.io;v=v1beta1",
object: func(t *testing.T) (metav1.Object, string, string) {
return &metav1.ObjectMeta{Name: "default", Namespace: ""}, "", "namespaces"
},
wantBody: func(t *testing.T, w io.Reader) {
expectTableWatchEvents(t, 1, 3, metav1beta1.IncludeMetadata, json.NewDecoder(w))
},
},
{
name: "verify columns on CRDs in json",
accept: "application/json;as=Table;g=meta.k8s.io;v=v1beta1",
object: func(t *testing.T) (metav1.Object, string, string) {
cr, err := crclient.Create(&unstructured.Unstructured{Object: map[string]interface{}{"apiVersion": "cr.bar.com/v1", "kind": "Foo", "metadata": map[string]interface{}{"name": "test-1"}}}, metav1.CreateOptions{})
if err != nil {
t.Fatalf("unable to create cr: %v", err)
}
if _, err := crclient.Patch("test-1", types.MergePatchType, []byte(`{"metadata":{"annotations":{"test":"1"}}}`), metav1.PatchOptions{}); err != nil {
t.Fatalf("unable to patch cr: %v", err)
}
return cr, crdGVR.Group, "foos"
},
wantBody: func(t *testing.T, w io.Reader) {
expectTableWatchEvents(t, 2, 2, metav1beta1.IncludeMetadata, json.NewDecoder(w))
},
},
{
name: "verify columns on CRDs in json;stream=watch",
accept: "application/json;stream=watch;as=Table;g=meta.k8s.io;v=v1beta1",
object: func(t *testing.T) (metav1.Object, string, string) {
cr, err := crclient.Create(&unstructured.Unstructured{Object: map[string]interface{}{"apiVersion": "cr.bar.com/v1", "kind": "Foo", "metadata": map[string]interface{}{"name": "test-2"}}}, metav1.CreateOptions{})
if err != nil {
t.Fatalf("unable to create cr: %v", err)
}
if _, err := crclient.Patch("test-2", types.MergePatchType, []byte(`{"metadata":{"annotations":{"test":"1"}}}`), metav1.PatchOptions{}); err != nil {
t.Fatalf("unable to patch cr: %v", err)
}
return cr, crdGVR.Group, "foos"
},
wantBody: func(t *testing.T, w io.Reader) {
expectTableWatchEvents(t, 2, 2, metav1beta1.IncludeMetadata, json.NewDecoder(w))
},
},
{
name: "verify columns on CRDs in yaml",
accept: "application/yaml;as=Table;g=meta.k8s.io;v=v1beta1",
object: func(t *testing.T) (metav1.Object, string, string) {
cr, err := crclient.Create(&unstructured.Unstructured{Object: map[string]interface{}{"apiVersion": "cr.bar.com/v1", "kind": "Foo", "metadata": map[string]interface{}{"name": "test-3"}}}, metav1.CreateOptions{})
if err != nil {
t.Fatalf("unable to create cr: %v", err)
}
if _, err := crclient.Patch("test-3", types.MergePatchType, []byte(`{"metadata":{"annotations":{"test":"1"}}}`), metav1.PatchOptions{}); err != nil {
t.Fatalf("unable to patch cr: %v", err)
}
return cr, crdGVR.Group, "foos"
},
wantErr: func(t *testing.T, err error) {
if !apierrors.IsNotAcceptable(err) {
t.Fatal(err)
}
// TODO: this should be a more specific error
if err.Error() != "only the following media types are accepted: application/json;stream=watch" {
t.Fatal(err)
}
},
},
{
name: "verify columns on services",
accept: "application/json;as=Table;g=meta.k8s.io;v=v1beta1",
object: func(t *testing.T) (metav1.Object, string, string) {
ns := "default"
svc, err := clientset.CoreV1().Services(ns).Create(&v1.Service{ObjectMeta: metav1.ObjectMeta{Name: "test-1"}, Spec: v1.ServiceSpec{Ports: []v1.ServicePort{{Port: 1000}}}})
if err != nil {
t.Fatalf("unable to create service: %v", err)
}
if _, err := clientset.CoreV1().Services(ns).Patch(svc.Name, types.MergePatchType, []byte(`{"metadata":{"annotations":{"test":"1"}}}`)); err != nil {
t.Fatalf("unable to update service: %v", err)
}
return svc, "", "services"
},
wantBody: func(t *testing.T, w io.Reader) {
expectTableWatchEvents(t, 2, 7, metav1beta1.IncludeMetadata, json.NewDecoder(w))
},
},
{
name: "verify columns on services with no object",
accept: "application/json;as=Table;g=meta.k8s.io;v=v1beta1",
includeObject: metav1beta1.IncludeNone,
object: func(t *testing.T) (metav1.Object, string, string) {
ns := "default"
obj, err := clientset.CoreV1().Services(ns).Create(&v1.Service{ObjectMeta: metav1.ObjectMeta{Name: "test-2"}, Spec: v1.ServiceSpec{Ports: []v1.ServicePort{{Port: 1000}}}})
if err != nil {
t.Fatalf("unable to create object: %v", err)
}
if _, err := clientset.CoreV1().Services(ns).Patch(obj.Name, types.MergePatchType, []byte(`{"metadata":{"annotations":{"test":"1"}}}`)); err != nil {
t.Fatalf("unable to update object: %v", err)
}
return obj, "", "services"
},
wantBody: func(t *testing.T, w io.Reader) {
expectTableWatchEvents(t, 2, 7, metav1beta1.IncludeNone, json.NewDecoder(w))
},
},
{
name: "verify columns on services with full object",
accept: "application/json;as=Table;g=meta.k8s.io;v=v1beta1",
includeObject: metav1beta1.IncludeObject,
object: func(t *testing.T) (metav1.Object, string, string) {
ns := "default"
obj, err := clientset.CoreV1().Services(ns).Create(&v1.Service{ObjectMeta: metav1.ObjectMeta{Name: "test-3"}, Spec: v1.ServiceSpec{Ports: []v1.ServicePort{{Port: 1000}}}})
if err != nil {
t.Fatalf("unable to create object: %v", err)
}
if _, err := clientset.CoreV1().Services(ns).Patch(obj.Name, types.MergePatchType, []byte(`{"metadata":{"annotations":{"test":"1"}}}`)); err != nil {
t.Fatalf("unable to update object: %v", err)
}
return obj, "", "services"
},
wantBody: func(t *testing.T, w io.Reader) {
objects := expectTableWatchEvents(t, 2, 7, metav1beta1.IncludeObject, json.NewDecoder(w))
var svc v1.Service
if err := json.Unmarshal(objects[1], &svc); err != nil {
t.Fatal(err)
}
if svc.Annotations["test"] != "1" || svc.Spec.Ports[0].Port != 1000 {
t.Fatalf("unexpected object: %#v", svc)
}
},
},
{
name: "verify partial metadata object on config maps",
accept: "application/json;as=PartialObjectMetadata;g=meta.k8s.io;v=v1beta1",
object: func(t *testing.T) (metav1.Object, string, string) {
ns := "default"
obj, err := clientset.CoreV1().ConfigMaps(ns).Create(&v1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Name: "test-1", Annotations: map[string]string{"test": "0"}}})
if err != nil {
t.Fatalf("unable to create object: %v", err)
}
if _, err := clientset.CoreV1().ConfigMaps(ns).Patch(obj.Name, types.MergePatchType, []byte(`{"metadata":{"annotations":{"test":"1"}}}`)); err != nil {
t.Fatalf("unable to update object: %v", err)
}
return obj, "", "configmaps"
},
wantBody: func(t *testing.T, w io.Reader) {
expectPartialObjectMetaEvents(t, json.NewDecoder(w), "0", "1")
},
},
{
name: "verify partial metadata object on config maps in protobuf",
accept: "application/vnd.kubernetes.protobuf;as=PartialObjectMetadata;g=meta.k8s.io;v=v1beta1",
object: func(t *testing.T) (metav1.Object, string, string) {
ns := "default"
obj, err := clientset.CoreV1().ConfigMaps(ns).Create(&v1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Name: "test-2", Annotations: map[string]string{"test": "0"}}})
if err != nil {
t.Fatalf("unable to create object: %v", err)
}
if _, err := clientset.CoreV1().ConfigMaps(ns).Patch(obj.Name, types.MergePatchType, []byte(`{"metadata":{"annotations":{"test":"1"}}}`)); err != nil {
t.Fatalf("unable to update object: %v", err)
}
return obj, "", "configmaps"
},
wantBody: func(t *testing.T, w io.Reader) {
expectPartialObjectMetaEventsProtobuf(t, w, "0", "1")
},
},
{
name: "verify error on unsupported mimetype protobuf for table conversion",
accept: "application/vnd.kubernetes.protobuf;as=Table;g=meta.k8s.io;v=v1beta1",
object: func(t *testing.T) (metav1.Object, string, string) {
return &metav1.ObjectMeta{Name: "kubernetes", Namespace: "default"}, "", "services"
},
wantErr: func(t *testing.T, err error) {
if !apierrors.IsNotAcceptable(err) {
t.Fatal(err)
}
// TODO: this should be a more specific error
if err.Error() != "only the following media types are accepted: application/json, application/yaml, application/vnd.kubernetes.protobuf" {
t.Fatal(err)
}
},
},
{
name: "verify error on invalid mimetype - bad version",
accept: "application/json;as=PartialObjectMetadata;g=meta.k8s.io;v=v1",
object: func(t *testing.T) (metav1.Object, string, string) {
return &metav1.ObjectMeta{Name: "kubernetes", Namespace: "default"}, "", "services"
},
wantErr: func(t *testing.T, err error) {
if !apierrors.IsNotAcceptable(err) {
t.Fatal(err)
}
},
},
{
name: "verify error on invalid mimetype - bad group",
accept: "application/json;as=PartialObjectMetadata;g=k8s.io;v=v1beta1",
object: func(t *testing.T) (metav1.Object, string, string) {
return &metav1.ObjectMeta{Name: "kubernetes", Namespace: "default"}, "", "services"
},
wantErr: func(t *testing.T, err error) {
if !apierrors.IsNotAcceptable(err) {
t.Fatal(err)
}
},
},
{
name: "verify error on invalid mimetype - bad kind",
accept: "application/json;as=PartialObject;g=meta.k8s.io;v=v1beta1",
object: func(t *testing.T) (metav1.Object, string, string) {
return &metav1.ObjectMeta{Name: "kubernetes", Namespace: "default"}, "", "services"
},
wantErr: func(t *testing.T, err error) {
if !apierrors.IsNotAcceptable(err) {
t.Fatal(err)
}
},
},
{
name: "verify error on invalid mimetype - missing kind",
accept: "application/json;g=meta.k8s.io;v=v1beta1",
object: func(t *testing.T) (metav1.Object, string, string) {
return &metav1.ObjectMeta{Name: "kubernetes", Namespace: "default"}, "", "services"
},
wantErr: func(t *testing.T, err error) {
if !apierrors.IsNotAcceptable(err) {
t.Fatal(err)
}
},
},
{
name: "verify error on invalid transform parameter",
accept: "application/json;as=Table;g=meta.k8s.io;v=v1beta1",
includeObject: metav1beta1.IncludeObjectPolicy("unrecognized"),
object: func(t *testing.T) (metav1.Object, string, string) {
return &metav1.ObjectMeta{Name: "kubernetes", Namespace: "default"}, "", "services"
},
wantErr: func(t *testing.T, err error) {
if !apierrors.IsBadRequest(err) || !strings.Contains(err.Error(), `Invalid value: "unrecognized": must be 'Metadata', 'Object', 'None', or empty`) {
t.Fatal(err)
}
},
},
}
for i := range testcases {
tc := testcases[i]
t.Run(tc.name, func(t *testing.T) {
obj, group, resource := tc.object(t)
cfg := dynamic.ConfigFor(config)
if len(group) == 0 {
cfg = dynamic.ConfigFor(&restclient.Config{Host: s.URL})
cfg.APIPath = "/api"
} else {
cfg.APIPath = "/apis"
}
cfg.GroupVersion = &schema.GroupVersion{Group: group, Version: "v1"}
client, err := restclient.RESTClientFor(cfg)
if err != nil {
t.Fatal(err)
}
rv, _ := strconv.Atoi(obj.GetResourceVersion())
if rv < 1 {
rv = 1
}
w, err := client.Get().
Resource(resource).NamespaceIfScoped(obj.GetNamespace(), len(obj.GetNamespace()) > 0).
SetHeader("Accept", tc.accept).
VersionedParams(&metav1.ListOptions{
ResourceVersion: strconv.Itoa(rv - 1),
Watch: true,
FieldSelector: fields.OneTermEqualSelector("metadata.name", obj.GetName()).String(),
}, metav1.ParameterCodec).
Param("includeObject", string(tc.includeObject)).
Stream()
if (tc.wantErr != nil) != (err != nil) {
t.Fatalf("unexpected error: %v", err)
}
if tc.wantErr != nil {
tc.wantErr(t, err)
return
}
if err != nil {
t.Fatal(err)
}
defer w.Close()
tc.wantBody(t, w)
})
}
}
func expectTableWatchEvents(t *testing.T, count, columns int, policy metav1beta1.IncludeObjectPolicy, d *json.Decoder) [][]byte {
t.Helper()
var objects [][]byte
for i := 0; i < count; i++ {
var evt metav1.WatchEvent
if err := d.Decode(&evt); err != nil {
t.Fatal(err)
}
var table metav1beta1.Table
if err := json.Unmarshal(evt.Object.Raw, &table); err != nil {
t.Fatal(err)
}
if i == 0 {
if len(table.ColumnDefinitions) != columns {
t.Fatalf("Got unexpected columns on first watch event: %d vs %#v", columns, table.ColumnDefinitions)
}
} else {
if len(table.ColumnDefinitions) != 0 {
t.Fatalf("Expected no columns on second watch event: %#v", table.ColumnDefinitions)
}
}
if len(table.Rows) != 1 {
t.Fatalf("Invalid rows: %#v", table.Rows)
}
row := table.Rows[0]
if len(row.Cells) != columns {
t.Fatalf("Invalid row width: %#v", row.Cells)
}
switch policy {
case metav1beta1.IncludeMetadata:
var meta metav1beta1.PartialObjectMetadata
if err := json.Unmarshal(row.Object.Raw, &meta); err != nil {
t.Fatalf("expected partial object: %v", err)
}
partialObj := metav1.TypeMeta{Kind: "PartialObjectMetadata", APIVersion: "meta.k8s.io/v1beta1"}
if meta.TypeMeta != partialObj {
t.Fatalf("expected partial object: %#v", meta)
}
case metav1beta1.IncludeNone:
if len(row.Object.Raw) != 0 {
t.Fatalf("Expected no object: %s", string(row.Object.Raw))
}
case metav1beta1.IncludeObject:
if len(row.Object.Raw) == 0 {
t.Fatalf("Expected object: %s", string(row.Object.Raw))
}
objects = append(objects, row.Object.Raw)
}
}
return objects
}
func expectPartialObjectMetaEvents(t *testing.T, d *json.Decoder, values ...string) {
t.Helper()
for i, value := range values {
var evt metav1.WatchEvent
if err := d.Decode(&evt); err != nil {
t.Fatal(err)
}
var meta metav1beta1.PartialObjectMetadata
if err := json.Unmarshal(evt.Object.Raw, &meta); err != nil {
t.Fatal(err)
}
typeMeta := metav1.TypeMeta{Kind: "PartialObjectMetadata", APIVersion: "meta.k8s.io/v1beta1"}
if meta.TypeMeta != typeMeta {
t.Fatalf("expected partial object: %#v", meta)
}
if meta.Annotations["test"] != value {
t.Fatalf("expected event %d to have value %q instead of %q", i+1, value, meta.Annotations["test"])
}
}
}
func expectPartialObjectMetaEventsProtobuf(t *testing.T, r io.Reader, values ...string) {
scheme := runtime.NewScheme()
metav1.AddToGroupVersion(scheme, schema.GroupVersion{Version: "v1"})
rs := protobuf.NewRawSerializer(scheme, scheme, "application/vnd.kubernetes.protobuf")
d := streaming.NewDecoder(
protobuf.LengthDelimitedFramer.NewFrameReader(ioutil.NopCloser(r)),
rs,
)
ds := metainternalversion.Codecs.UniversalDeserializer()
for i, value := range values {
var evt metav1.WatchEvent
if _, _, err := d.Decode(nil, &evt); err != nil {
t.Fatal(err)
}
obj, gvk, err := ds.Decode(evt.Object.Raw, nil, nil)
if err != nil {
t.Fatal(err)
}
meta, ok := obj.(*metav1beta1.PartialObjectMetadata)
if !ok {
t.Fatalf("unexpected watch object %T", obj)
}
expected := &schema.GroupVersionKind{Kind: "PartialObjectMetadata", Version: "v1beta1", Group: "meta.k8s.io"}
if !reflect.DeepEqual(expected, gvk) {
t.Fatalf("expected partial object: %#v", meta)
}
if meta.Annotations["test"] != value {
t.Fatalf("expected event %d to have value %q instead of %q", i+1, value, meta.Annotations["test"])
}
}
}