Merge pull request #40624 from smarterclayton/storage_options

Automatic merge from submit-queue (batch tested with PRs 39217, 40624)

Allow StorageFactory to wrap encoders and decoders
pull/6/head
Kubernetes Submit Queue 2017-02-01 22:28:40 -08:00 committed by GitHub
commit c92f29a455
2 changed files with 158 additions and 63 deletions

View File

@ -74,7 +74,19 @@ type DefaultStorageFactory struct {
APIResourceConfigSource APIResourceConfigSource APIResourceConfigSource APIResourceConfigSource
// newStorageCodecFn exists to be overwritten for unit testing. // newStorageCodecFn exists to be overwritten for unit testing.
newStorageCodecFn func(storageMediaType string, ns runtime.StorageSerializer, storageVersion, memoryVersion schema.GroupVersion, config storagebackend.Config) (codec runtime.Codec, err error) newStorageCodecFn func(opts StorageCodecConfig) (codec runtime.Codec, err error)
}
// StorageCodecConfig are the arguments passed to newStorageCodecFn
type StorageCodecConfig struct {
StorageMediaType string
StorageSerializer runtime.StorageSerializer
StorageVersion schema.GroupVersion
MemoryVersion schema.GroupVersion
Config storagebackend.Config
EncoderDecoratorFn func(runtime.Encoder) runtime.Encoder
DecoderDecoratorFn func([]runtime.Decoder) []runtime.Decoder
} }
type groupResourceOverrides struct { type groupResourceOverrides struct {
@ -95,6 +107,34 @@ type groupResourceOverrides struct {
// of exposing one set of concepts. autoscaling.HPA and extensions.HPA as a for instance // of exposing one set of concepts. autoscaling.HPA and extensions.HPA as a for instance
// The order of the slice matters! It is the priority order of lookup for finding a storage location // The order of the slice matters! It is the priority order of lookup for finding a storage location
cohabitatingResources []schema.GroupResource cohabitatingResources []schema.GroupResource
// encoderDecoratorFn is optional and may wrap the provided encoder prior to being serialized.
encoderDecoratorFn func(runtime.Encoder) runtime.Encoder
// decoderDecoratorFn is optional and may wrap the provided decoders (can add new decoders). The order of
// returned decoders will be priority for attempt to decode.
decoderDecoratorFn func([]runtime.Decoder) []runtime.Decoder
}
// Apply overrides the provided config and options if the override has a value in that position
func (o groupResourceOverrides) Apply(config *storagebackend.Config, options *StorageCodecConfig) {
if len(o.etcdLocation) > 0 {
config.ServerList = o.etcdLocation
}
if len(o.etcdPrefix) > 0 {
config.Prefix = o.etcdPrefix
}
if len(o.mediaType) > 0 {
options.StorageMediaType = o.mediaType
}
if o.serializer != nil {
options.StorageSerializer = o.serializer
}
if o.encoderDecoratorFn != nil {
options.EncoderDecoratorFn = o.encoderDecoratorFn
}
if o.decoderDecoratorFn != nil {
options.DecoderDecoratorFn = o.decoderDecoratorFn
}
} }
var _ StorageFactory = &DefaultStorageFactory{} var _ StorageFactory = &DefaultStorageFactory{}
@ -165,6 +205,15 @@ func (s *DefaultStorageFactory) AddCohabitatingResources(groupResources ...schem
} }
} }
func (s *DefaultStorageFactory) AddSerializationChains(encoderDecoratorFn func(runtime.Encoder) runtime.Encoder, decoderDecoratorFn func([]runtime.Decoder) []runtime.Decoder, groupResources ...schema.GroupResource) {
for _, groupResource := range groupResources {
overrides := s.Overrides[groupResource]
overrides.encoderDecoratorFn = encoderDecoratorFn
overrides.decoderDecoratorFn = decoderDecoratorFn
s.Overrides[groupResource] = overrides
}
}
func getAllResourcesAlias(resource schema.GroupResource) schema.GroupResource { func getAllResourcesAlias(resource schema.GroupResource) schema.GroupResource {
return schema.GroupResource{Group: resource.Group, Resource: AllResources} return schema.GroupResource{Group: resource.Group, Resource: AllResources}
} }
@ -184,64 +233,38 @@ func (s *DefaultStorageFactory) getStorageGroupResource(groupResource schema.Gro
func (s *DefaultStorageFactory) NewConfig(groupResource schema.GroupResource) (*storagebackend.Config, error) { func (s *DefaultStorageFactory) NewConfig(groupResource schema.GroupResource) (*storagebackend.Config, error) {
chosenStorageResource := s.getStorageGroupResource(groupResource) chosenStorageResource := s.getStorageGroupResource(groupResource)
groupOverride := s.Overrides[getAllResourcesAlias(chosenStorageResource)]
exactResourceOverride := s.Overrides[chosenStorageResource]
overriddenEtcdLocations := []string{}
if len(groupOverride.etcdLocation) > 0 {
overriddenEtcdLocations = groupOverride.etcdLocation
}
if len(exactResourceOverride.etcdLocation) > 0 {
overriddenEtcdLocations = exactResourceOverride.etcdLocation
}
etcdPrefix := s.StorageConfig.Prefix
if len(groupOverride.etcdPrefix) > 0 {
etcdPrefix = groupOverride.etcdPrefix
}
if len(exactResourceOverride.etcdPrefix) > 0 {
etcdPrefix = exactResourceOverride.etcdPrefix
}
etcdMediaType := s.DefaultMediaType
if len(groupOverride.mediaType) != 0 {
etcdMediaType = groupOverride.mediaType
}
if len(exactResourceOverride.mediaType) != 0 {
etcdMediaType = exactResourceOverride.mediaType
}
etcdSerializer := s.DefaultSerializer
if groupOverride.serializer != nil {
etcdSerializer = groupOverride.serializer
}
if exactResourceOverride.serializer != nil {
etcdSerializer = exactResourceOverride.serializer
}
// operate on copy // operate on copy
config := s.StorageConfig storageConfig := s.StorageConfig
config.Prefix = etcdPrefix codecConfig := StorageCodecConfig{
if len(overriddenEtcdLocations) > 0 { StorageMediaType: s.DefaultMediaType,
config.ServerList = overriddenEtcdLocations StorageSerializer: s.DefaultSerializer,
} }
storageEncodingVersion, err := s.ResourceEncodingConfig.StorageEncodingFor(chosenStorageResource) if override, ok := s.Overrides[getAllResourcesAlias(chosenStorageResource)]; ok {
override.Apply(&storageConfig, &codecConfig)
}
if override, ok := s.Overrides[chosenStorageResource]; ok {
override.Apply(&storageConfig, &codecConfig)
}
var err error
codecConfig.StorageVersion, err = s.ResourceEncodingConfig.StorageEncodingFor(chosenStorageResource)
if err != nil { if err != nil {
return nil, err return nil, err
} }
internalVersion, err := s.ResourceEncodingConfig.InMemoryEncodingFor(groupResource) codecConfig.MemoryVersion, err = s.ResourceEncodingConfig.InMemoryEncodingFor(groupResource)
if err != nil { if err != nil {
return nil, err return nil, err
} }
codecConfig.Config = storageConfig
codec, err := s.newStorageCodecFn(etcdMediaType, etcdSerializer, storageEncodingVersion, internalVersion, config) storageConfig.Codec, err = s.newStorageCodecFn(codecConfig)
if err != nil { if err != nil {
return nil, err return nil, err
} }
glog.V(3).Infof("storing %v in %v, reading as %v from %v", groupResource, codecConfig.StorageVersion, codecConfig.MemoryVersion, codecConfig.Config)
glog.V(3).Infof("storing %v in %v, reading as %v from %v", groupResource, storageEncodingVersion, internalVersion, config) return &storageConfig, nil
config.Codec = codec
return &config, nil
} }
// Get all backends for all registered storage destinations. // Get all backends for all registered storage destinations.
@ -257,41 +280,51 @@ func (s *DefaultStorageFactory) Backends() []string {
// NewStorageCodec assembles a storage codec for the provided storage media type, the provided serializer, and the requested // NewStorageCodec assembles a storage codec for the provided storage media type, the provided serializer, and the requested
// storage and memory versions. // storage and memory versions.
func NewStorageCodec(storageMediaType string, ns runtime.StorageSerializer, storageVersion, memoryVersion schema.GroupVersion, config storagebackend.Config) (runtime.Codec, error) { func NewStorageCodec(opts StorageCodecConfig) (runtime.Codec, error) {
mediaType, _, err := mime.ParseMediaType(storageMediaType) mediaType, _, err := mime.ParseMediaType(opts.StorageMediaType)
if err != nil { if err != nil {
return nil, fmt.Errorf("%q is not a valid mime-type", storageMediaType) return nil, fmt.Errorf("%q is not a valid mime-type", opts.StorageMediaType)
} }
serializer, ok := runtime.SerializerInfoForMediaType(ns.SupportedMediaTypes(), mediaType) serializer, ok := runtime.SerializerInfoForMediaType(opts.StorageSerializer.SupportedMediaTypes(), mediaType)
if !ok { if !ok {
return nil, fmt.Errorf("unable to find serializer for %q", storageMediaType) return nil, fmt.Errorf("unable to find serializer for %q", opts.StorageMediaType)
} }
s := serializer.Serializer s := serializer.Serializer
// etcd2 only supports string data - we must wrap any result before returning // etcd2 only supports string data - we must wrap any result before returning
// TODO: storagebackend should return a boolean indicating whether it supports binary data // TODO: storagebackend should return a boolean indicating whether it supports binary data
if !serializer.EncodesAsText && (config.Type == storagebackend.StorageTypeUnset || config.Type == storagebackend.StorageTypeETCD2) { if !serializer.EncodesAsText && (opts.Config.Type == storagebackend.StorageTypeUnset || opts.Config.Type == storagebackend.StorageTypeETCD2) {
glog.V(4).Infof("Wrapping the underlying binary storage serializer with a base64 encoding for etcd2") glog.V(4).Infof("Wrapping the underlying binary storage serializer with a base64 encoding for etcd2")
s = runtime.NewBase64Serializer(s) s = runtime.NewBase64Serializer(s)
} }
encoder := ns.EncoderForVersion( // Give callers the opportunity to wrap encoders and decoders. For decoders, each returned decoder will
s, // be passed to the recognizer so that multiple decoders are available.
var encoder runtime.Encoder = s
if opts.EncoderDecoratorFn != nil {
encoder = opts.EncoderDecoratorFn(encoder)
}
decoders := []runtime.Decoder{s, opts.StorageSerializer.UniversalDeserializer()}
if opts.DecoderDecoratorFn != nil {
decoders = opts.DecoderDecoratorFn(decoders)
}
// Ensure the storage receives the correct version.
encoder = opts.StorageSerializer.EncoderForVersion(
encoder,
runtime.NewMultiGroupVersioner( runtime.NewMultiGroupVersioner(
storageVersion, opts.StorageVersion,
schema.GroupKind{Group: storageVersion.Group}, schema.GroupKind{Group: opts.StorageVersion.Group},
schema.GroupKind{Group: memoryVersion.Group}, schema.GroupKind{Group: opts.MemoryVersion.Group},
), ),
) )
decoder := opts.StorageSerializer.DecoderToVersion(
ds := recognizer.NewDecoder(s, ns.UniversalDeserializer()) recognizer.NewDecoder(decoders...),
decoder := ns.DecoderToVersion(
ds,
runtime.NewMultiGroupVersioner( runtime.NewMultiGroupVersioner(
memoryVersion, opts.MemoryVersion,
schema.GroupKind{Group: memoryVersion.Group}, schema.GroupKind{Group: opts.MemoryVersion.Group},
schema.GroupKind{Group: storageVersion.Group}, schema.GroupKind{Group: opts.StorageVersion.Group},
), ),
) )

View File

@ -20,6 +20,7 @@ import (
"reflect" "reflect"
"testing" "testing"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/server/options" "k8s.io/apiserver/pkg/server/options"
"k8s.io/apiserver/pkg/storage/storagebackend" "k8s.io/apiserver/pkg/storage/storagebackend"
@ -27,6 +28,67 @@ import (
"k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/apis/extensions"
) )
type fakeNegotiater struct {
serializer, streamSerializer runtime.Serializer
framer runtime.Framer
types, streamTypes []string
}
func (n *fakeNegotiater) SupportedMediaTypes() []runtime.SerializerInfo {
var out []runtime.SerializerInfo
for _, s := range n.types {
info := runtime.SerializerInfo{Serializer: n.serializer, MediaType: s, EncodesAsText: true}
for _, t := range n.streamTypes {
if t == s {
info.StreamSerializer = &runtime.StreamSerializerInfo{
EncodesAsText: true,
Framer: n.framer,
Serializer: n.streamSerializer,
}
}
}
out = append(out, info)
}
return out
}
func (n *fakeNegotiater) UniversalDeserializer() runtime.Decoder {
return n.serializer
}
func (n *fakeNegotiater) EncoderForVersion(serializer runtime.Encoder, gv runtime.GroupVersioner) runtime.Encoder {
return n.serializer
}
func (n *fakeNegotiater) DecoderToVersion(serializer runtime.Decoder, gv runtime.GroupVersioner) runtime.Decoder {
return n.serializer
}
func TestDefaultStorageFactory(t *testing.T) {
ns := &fakeNegotiater{types: []string{"test/test"}}
f := NewDefaultStorageFactory(storagebackend.Config{}, "test/test", ns, NewDefaultResourceEncodingConfig(), NewResourceConfig())
f.AddCohabitatingResources(schema.GroupResource{Resource: "test"}, schema.GroupResource{Resource: "test2", Group: "2"})
called := false
testEncoderChain := func(e runtime.Encoder) runtime.Encoder {
called = true
return e
}
f.AddSerializationChains(testEncoderChain, nil, schema.GroupResource{Resource: "test"})
f.SetEtcdLocation(schema.GroupResource{Resource: "*"}, []string{"/server2"})
f.SetEtcdPrefix(schema.GroupResource{Resource: "test"}, "/prefix_for_test")
config, err := f.NewConfig(schema.GroupResource{Resource: "test"})
if err != nil {
t.Fatal(err)
}
if config.Prefix != "/prefix_for_test" || !reflect.DeepEqual(config.ServerList, []string{"/server2"}) {
t.Errorf("unexpected config %#v", config)
}
if !called {
t.Errorf("expected encoder chain to be called")
}
}
func TestUpdateEtcdOverrides(t *testing.T) { func TestUpdateEtcdOverrides(t *testing.T) {
testCases := []struct { testCases := []struct {
resource schema.GroupResource resource schema.GroupResource