Merge pull request #14823 from wojtek-t/move_events_to_separate_etcd

Move events to a separate etcd instance
pull/6/head
Alex Robinson 2015-10-05 16:28:04 -07:00
commit 32b9d8aad0
14 changed files with 290 additions and 88 deletions

View File

@ -22,7 +22,7 @@
"command": [ "command": [
"/bin/sh", "/bin/sh",
"-c", "-c",
"/usr/local/bin/etcd --listen-peer-urls=http://127.0.0.1:{{ server_port }} --addr 127.0.0.1:{{ port }} --bind-addr 127.0.0.1:{{ port }} --data-dir /var/etcd/data{{ suffix }} 1>>/var/log/etcd{{ suffix }}.log 2>&1" "/usr/local/bin/etcd --listen-peer-urls http://127.0.0.1:{{ server_port }} --addr 127.0.0.1:{{ port }} --bind-addr 127.0.0.1:{{ port }} --data-dir /var/etcd/data{{ suffix }} 1>>/var/log/etcd{{ suffix }}.log 2>&1"
], ],
"livenessProbe": { "livenessProbe": {
"httpGet": { "httpGet": {

View File

@ -44,6 +44,7 @@
{% endif -%} {% endif -%}
{% set etcd_servers = "--etcd-servers=http://127.0.0.1:4001" -%} {% set etcd_servers = "--etcd-servers=http://127.0.0.1:4001" -%}
{% set etcd_servers_overrides = "--etcd-servers-overrides=/events#http://127.0.0.1:4002" -%}
{% set service_cluster_ip_range = "" -%} {% set service_cluster_ip_range = "" -%}
{% if pillar['service_cluster_ip_range'] is defined -%} {% if pillar['service_cluster_ip_range'] is defined -%}
@ -88,7 +89,7 @@
{% set runtime_config = "--runtime-config=" + grains.runtime_config -%} {% set runtime_config = "--runtime-config=" + grains.runtime_config -%}
{% endif -%} {% endif -%}
{% set params = address + " " + etcd_servers + " " + cloud_provider + " " + cloud_config + " " + runtime_config + " " + admission_control + " " + service_cluster_ip_range + " " + client_ca_file + " " + basic_auth_file + " " + min_request_timeout -%} {% set params = address + " " + etcd_servers + " " + etcd_servers_overrides + " " + cloud_provider + " " + cloud_config + " " + runtime_config + " " + admission_control + " " + service_cluster_ip_range + " " + client_ca_file + " " + basic_auth_file + " " + min_request_timeout -%}
{% set params = params + " " + cluster_name + " " + cert_file + " " + key_file + " --secure-port=" + secure_port + " " + token_auth_file + " " + bind_address + " " + pillar['log_level'] + " " + advertise_address + " " + proxy_ssh_options -%} {% set params = params + " " + cluster_name + " " + cert_file + " " + key_file + " --secure-port=" + secure_port + " " + token_auth_file + " " + bind_address + " " + pillar['log_level'] + " " + advertise_address + " " + proxy_ssh_options -%}
# test_args has to be kept at the end, so they'll overwrite any prior configuration # test_args has to be kept at the end, so they'll overwrite any prior configuration

View File

@ -144,10 +144,12 @@ func startComponents(firstManifestURL, secondManifestURL string) (string, string
} }
expEtcdStorage, err := master.NewEtcdStorage(etcdClient, latest.GroupOrDie("experimental").InterfacesFor, testapi.Experimental.GroupAndVersion(), etcdtest.PathPrefix()) expEtcdStorage, err := master.NewEtcdStorage(etcdClient, latest.GroupOrDie("experimental").InterfacesFor, testapi.Experimental.GroupAndVersion(), etcdtest.PathPrefix())
storageVersions["experimental"] = testapi.Experimental.GroupAndVersion() storageVersions["experimental"] = testapi.Experimental.GroupAndVersion()
if err != nil { if err != nil {
glog.Fatalf("Unable to get etcd storage for experimental: %v", err) glog.Fatalf("Unable to get etcd storage for experimental: %v", err)
} }
storageDestinations := master.NewStorageDestinations()
storageDestinations.AddAPIGroup("", etcdStorage)
storageDestinations.AddAPIGroup("experimental", expEtcdStorage)
// Master // Master
host, port, err := net.SplitHostPort(strings.TrimLeft(apiServer.URL, "http://")) host, port, err := net.SplitHostPort(strings.TrimLeft(apiServer.URL, "http://"))
@ -166,8 +168,7 @@ func startComponents(firstManifestURL, secondManifestURL string) (string, string
// Create a master and install handlers into mux. // Create a master and install handlers into mux.
m := master.New(&master.Config{ m := master.New(&master.Config{
DatabaseStorage: etcdStorage, StorageDestinations: storageDestinations,
ExpDatabaseStorage: expEtcdStorage,
KubeletClient: fakeKubeletClient{}, KubeletClient: fakeKubeletClient{},
EnableCoreControllers: true, EnableCoreControllers: true,
EnableLogsSupport: false, EnableLogsSupport: false,

View File

@ -95,6 +95,7 @@ type APIServer struct {
AdmissionControlConfigFile string AdmissionControlConfigFile string
EtcdServerList []string EtcdServerList []string
EtcdConfigFile string EtcdConfigFile string
EtcdServersOverrides []string
EtcdPathPrefix string EtcdPathPrefix string
CorsAllowedOriginList []string CorsAllowedOriginList []string
AllowPrivileged bool AllowPrivileged bool
@ -211,6 +212,7 @@ func (s *APIServer) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&s.AdmissionControlConfigFile, "admission-control-config-file", s.AdmissionControlConfigFile, "File with admission control configuration.") fs.StringVar(&s.AdmissionControlConfigFile, "admission-control-config-file", s.AdmissionControlConfigFile, "File with admission control configuration.")
fs.StringSliceVar(&s.EtcdServerList, "etcd-servers", s.EtcdServerList, "List of etcd servers to watch (http://ip:port), comma separated. Mutually exclusive with -etcd-config") fs.StringSliceVar(&s.EtcdServerList, "etcd-servers", s.EtcdServerList, "List of etcd servers to watch (http://ip:port), comma separated. Mutually exclusive with -etcd-config")
fs.StringVar(&s.EtcdConfigFile, "etcd-config", s.EtcdConfigFile, "The config file for the etcd client. Mutually exclusive with -etcd-servers.") fs.StringVar(&s.EtcdConfigFile, "etcd-config", s.EtcdConfigFile, "The config file for the etcd client. Mutually exclusive with -etcd-servers.")
fs.StringSliceVar(&s.EtcdServersOverrides, "etcd-servers-overrides", s.EtcdServersOverrides, "Per-resource etcd servers overrides, comma separated. The individual override format: group/resource#servers, where servers are http://ip:port, semicolon separated.")
fs.StringVar(&s.EtcdPathPrefix, "etcd-prefix", s.EtcdPathPrefix, "The prefix for all resource paths in etcd.") fs.StringVar(&s.EtcdPathPrefix, "etcd-prefix", s.EtcdPathPrefix, "The prefix for all resource paths in etcd.")
fs.StringSliceVar(&s.CorsAllowedOriginList, "cors-allowed-origins", s.CorsAllowedOriginList, "List of allowed origins for CORS, comma separated. An allowed origin can be a regular expression to support subdomain matching. If this list is empty CORS will not be enabled.") fs.StringSliceVar(&s.CorsAllowedOriginList, "cors-allowed-origins", s.CorsAllowedOriginList, "List of allowed origins for CORS, comma separated. An allowed origin can be a regular expression to support subdomain matching. If this list is empty CORS will not be enabled.")
fs.BoolVar(&s.AllowPrivileged, "allow-privileged", s.AllowPrivileged, "If true, allow privileged containers.") fs.BoolVar(&s.AllowPrivileged, "allow-privileged", s.AllowPrivileged, "If true, allow privileged containers.")
@ -253,6 +255,8 @@ func (s *APIServer) verifyClusterIPFlags() {
} }
} }
type newEtcdFunc func(string, []string, meta.VersionInterfacesFunc, string, string) (storage.Interface, error)
func newEtcd(etcdConfigFile string, etcdServerList []string, interfacesFunc meta.VersionInterfacesFunc, storageVersion, pathPrefix string) (etcdStorage storage.Interface, err error) { func newEtcd(etcdConfigFile string, etcdServerList []string, interfacesFunc meta.VersionInterfacesFunc, storageVersion, pathPrefix string) (etcdStorage storage.Interface, err error) {
if storageVersion == "" { if storageVersion == "" {
return etcdStorage, fmt.Errorf("storageVersion is required to create a etcd storage") return etcdStorage, fmt.Errorf("storageVersion is required to create a etcd storage")
@ -294,6 +298,45 @@ func generateStorageVersionMap(legacyVersion string, storageVersions string) map
return storageVersionMap return storageVersionMap
} }
// parse the value of --etcd-servers-overrides and update given storageDestinations.
func updateEtcdOverrides(overrides []string, storageVersions map[string]string, prefix string, storageDestinations *master.StorageDestinations, newEtcdFn newEtcdFunc) {
if len(overrides) == 0 {
return
}
for _, override := range overrides {
tokens := strings.Split(override, "#")
if len(tokens) != 2 {
glog.Errorf("invalid value of etcd server overrides: %s", override)
continue
}
apiresource := strings.Split(tokens[0], "/")
if len(apiresource) != 2 {
glog.Errorf("invalid resource definition: %s", tokens[0])
}
group := apiresource[0]
resource := apiresource[1]
apigroup, err := latest.Group(group)
if err != nil {
glog.Errorf("invalid api group %s: %v", group, err)
continue
}
if _, found := storageVersions[apigroup.Group]; !found {
glog.Errorf("Couldn't find the storage version for group %s", apigroup.Group)
continue
}
servers := strings.Split(tokens[1], ";")
etcdOverrideStorage, err := newEtcdFn("", servers, apigroup.InterfacesFor, storageVersions[apigroup.Group], prefix)
if err != nil {
glog.Fatalf("Invalid storage version or misconfigured etcd for %s: %v", tokens[0], err)
}
storageDestinations.AddStorageOverride(group, resource, etcdOverrideStorage)
}
}
// Run runs the specified APIServer. This should never exit. // Run runs the specified APIServer. This should never exit.
func (s *APIServer) Run(_ []string) error { func (s *APIServer) Run(_ []string) error {
s.verifyClusterIPFlags() s.verifyClusterIPFlags()
@ -369,6 +412,8 @@ func (s *APIServer) Run(_ []string) error {
return err return err
} }
storageDestinations := master.NewStorageDestinations()
storageVersions := generateStorageVersionMap(s.DeprecatedStorageVersion, s.StorageVersions) storageVersions := generateStorageVersionMap(s.DeprecatedStorageVersion, s.StorageVersions)
if _, found := storageVersions[legacyV1Group.Group]; !found { if _, found := storageVersions[legacyV1Group.Group]; !found {
glog.Fatalf("Couldn't find the storage version for group: %q in storageVersions: %v", legacyV1Group.Group, storageVersions) glog.Fatalf("Couldn't find the storage version for group: %q in storageVersions: %v", legacyV1Group.Group, storageVersions)
@ -377,8 +422,8 @@ func (s *APIServer) Run(_ []string) error {
if err != nil { if err != nil {
glog.Fatalf("Invalid storage version or misconfigured etcd: %v", err) glog.Fatalf("Invalid storage version or misconfigured etcd: %v", err)
} }
storageDestinations.AddAPIGroup("", etcdStorage)
var expEtcdStorage storage.Interface
if enableExp { if enableExp {
expGroup, err := latest.Group("experimental") expGroup, err := latest.Group("experimental")
if err != nil { if err != nil {
@ -387,12 +432,15 @@ func (s *APIServer) Run(_ []string) error {
if _, found := storageVersions[expGroup.Group]; !found { if _, found := storageVersions[expGroup.Group]; !found {
glog.Fatalf("Couldn't find the storage version for group: %q in storageVersions: %v", expGroup.Group, storageVersions) glog.Fatalf("Couldn't find the storage version for group: %q in storageVersions: %v", expGroup.Group, storageVersions)
} }
expEtcdStorage, err = newEtcd(s.EtcdConfigFile, s.EtcdServerList, expGroup.InterfacesFor, storageVersions[expGroup.Group], s.EtcdPathPrefix) expEtcdStorage, err := newEtcd(s.EtcdConfigFile, s.EtcdServerList, expGroup.InterfacesFor, storageVersions[expGroup.Group], s.EtcdPathPrefix)
if err != nil { if err != nil {
glog.Fatalf("Invalid experimental storage version or misconfigured etcd: %v", err) glog.Fatalf("Invalid experimental storage version or misconfigured etcd: %v", err)
} }
storageDestinations.AddAPIGroup("experimental", expEtcdStorage)
} }
updateEtcdOverrides(s.EtcdServersOverrides, storageVersions, s.EtcdPathPrefix, &storageDestinations, newEtcd)
n := s.ServiceClusterIPRange n := s.ServiceClusterIPRange
// Default to the private server key for service account token signing // Default to the private server key for service account token signing
@ -460,10 +508,8 @@ func (s *APIServer) Run(_ []string) error {
} }
} }
config := &master.Config{ config := &master.Config{
DatabaseStorage: etcdStorage, StorageDestinations: storageDestinations,
ExpDatabaseStorage: expEtcdStorage, StorageVersions: storageVersions,
StorageVersions: storageVersions,
EventTTL: s.EventTTL, EventTTL: s.EventTTL,
KubeletClient: kubeletClient, KubeletClient: kubeletClient,
ServiceClusterIPRange: &n, ServiceClusterIPRange: &n,

View File

@ -19,7 +19,12 @@ package app
import ( import (
"reflect" "reflect"
"regexp" "regexp"
"strings"
"testing" "testing"
"k8s.io/kubernetes/pkg/api/meta"
"k8s.io/kubernetes/pkg/master"
"k8s.io/kubernetes/pkg/storage"
) )
func TestLongRunningRequestRegexp(t *testing.T) { func TestLongRunningRequestRegexp(t *testing.T) {
@ -98,3 +103,54 @@ func TestGenerateStorageVersionMap(t *testing.T) {
} }
} }
} }
func TestUpdateEtcdOverrides(t *testing.T) {
storageVersions := generateStorageVersionMap("", "v1,experimental/v1alpha1")
testCases := []struct {
apigroup string
resource string
servers []string
}{
{
apigroup: "",
resource: "resource",
servers: []string{"http://127.0.0.1:10000"},
},
{
apigroup: "",
resource: "resource",
servers: []string{"http://127.0.0.1:10000", "http://127.0.0.1:20000"},
},
{
apigroup: "experimental",
resource: "resource",
servers: []string{"http://127.0.0.1:10000"},
},
}
for _, test := range testCases {
newEtcd := func(_ string, serverList []string, _ meta.VersionInterfacesFunc, _, _ string) (storage.Interface, error) {
if !reflect.DeepEqual(test.servers, serverList) {
t.Errorf("unexpected server list, expected: %#v, got: %#v", test.servers, serverList)
}
return nil, nil
}
storageDestinations := master.NewStorageDestinations()
override := test.apigroup + "/" + test.resource + "#" + strings.Join(test.servers, ";")
updateEtcdOverrides([]string{override}, storageVersions, "", &storageDestinations, newEtcd)
apigroup, ok := storageDestinations.APIGroups[test.apigroup]
if !ok {
t.Errorf("apigroup: %s not created", test.apigroup)
continue
}
if apigroup.Overrides == nil {
t.Errorf("Overrides not created for: %s", test.apigroup)
continue
}
if _, ok := apigroup.Overrides[test.resource]; !ok {
t.Errorf("override not created for: %s", test.resource)
continue
}
}
}

View File

@ -77,6 +77,7 @@ etcd-config
etcd-prefix etcd-prefix
etcd-server etcd-server
etcd-servers etcd-servers
etcd-servers-overrides
event-burst event-burst
event-qps event-qps
event-ttl event-ttl

View File

@ -98,10 +98,78 @@ const (
DefaultEtcdPathPrefix = "/registry" DefaultEtcdPathPrefix = "/registry"
) )
// StorageDestinations is a mapping from API group & resource to
// the underlying storage interfaces.
type StorageDestinations struct {
APIGroups map[string]*StorageDestinationsForAPIGroup
}
type StorageDestinationsForAPIGroup struct {
Default storage.Interface
Overrides map[string]storage.Interface
}
func NewStorageDestinations() StorageDestinations {
return StorageDestinations{
APIGroups: map[string]*StorageDestinationsForAPIGroup{},
}
}
func (s *StorageDestinations) AddAPIGroup(group string, defaultStorage storage.Interface) {
s.APIGroups[group] = &StorageDestinationsForAPIGroup{
Default: defaultStorage,
Overrides: map[string]storage.Interface{},
}
}
func (s *StorageDestinations) AddStorageOverride(group, resource string, override storage.Interface) {
if _, ok := s.APIGroups[group]; !ok {
s.AddAPIGroup(group, nil)
}
if s.APIGroups[group].Overrides == nil {
s.APIGroups[group].Overrides = map[string]storage.Interface{}
}
s.APIGroups[group].Overrides[resource] = override
}
func (s *StorageDestinations) get(group, resource string) storage.Interface {
apigroup, ok := s.APIGroups[group]
if !ok {
glog.Errorf("No storage defined for API group: '%s'", apigroup)
return nil
}
if apigroup.Overrides != nil {
if client, exists := apigroup.Overrides[resource]; exists {
return client
}
}
return apigroup.Default
}
// Get all backends for all registered storage destinations.
// Used for getting all instances for health validations.
func (s *StorageDestinations) backends() []string {
backends := sets.String{}
for _, group := range s.APIGroups {
if group.Default != nil {
for _, backend := range group.Default.Backends() {
backends.Insert(backend)
}
}
if group.Overrides != nil {
for _, storage := range group.Overrides {
for _, backend := range storage.Backends() {
backends.Insert(backend)
}
}
}
}
return backends.List()
}
// Config is a structure used to configure a Master. // Config is a structure used to configure a Master.
type Config struct { type Config struct {
DatabaseStorage storage.Interface StorageDestinations StorageDestinations
ExpDatabaseStorage storage.Interface
// StorageVersions is a map between groups and their storage versions // StorageVersions is a map between groups and their storage versions
StorageVersions map[string]string StorageVersions map[string]string
EventTTL time.Duration EventTTL time.Duration
@ -435,35 +503,36 @@ func NewHandlerContainer(mux *http.ServeMux) *restful.Container {
func (m *Master) init(c *Config) { func (m *Master) init(c *Config) {
healthzChecks := []healthz.HealthzChecker{} healthzChecks := []healthz.HealthzChecker{}
m.clock = util.RealClock{} m.clock = util.RealClock{}
podStorage := podetcd.NewStorage(c.DatabaseStorage, c.EnableWatchCache, c.KubeletClient) dbClient := func(resource string) storage.Interface { return c.StorageDestinations.get("", resource) }
podStorage := podetcd.NewStorage(dbClient("pods"), c.EnableWatchCache, c.KubeletClient)
podTemplateStorage := podtemplateetcd.NewREST(c.DatabaseStorage) podTemplateStorage := podtemplateetcd.NewREST(dbClient("podTemplates"))
eventStorage := eventetcd.NewREST(c.DatabaseStorage, uint64(c.EventTTL.Seconds())) eventStorage := eventetcd.NewREST(dbClient("events"), uint64(c.EventTTL.Seconds()))
limitRangeStorage := limitrangeetcd.NewREST(c.DatabaseStorage) limitRangeStorage := limitrangeetcd.NewREST(dbClient("limitRanges"))
resourceQuotaStorage, resourceQuotaStatusStorage := resourcequotaetcd.NewREST(c.DatabaseStorage) resourceQuotaStorage, resourceQuotaStatusStorage := resourcequotaetcd.NewREST(dbClient("resourceQuotas"))
secretStorage := secretetcd.NewREST(c.DatabaseStorage) secretStorage := secretetcd.NewREST(dbClient("secrets"))
serviceAccountStorage := serviceaccountetcd.NewREST(c.DatabaseStorage) serviceAccountStorage := serviceaccountetcd.NewREST(dbClient("serviceAccounts"))
persistentVolumeStorage, persistentVolumeStatusStorage := pvetcd.NewREST(c.DatabaseStorage) persistentVolumeStorage, persistentVolumeStatusStorage := pvetcd.NewREST(dbClient("persistentVolumes"))
persistentVolumeClaimStorage, persistentVolumeClaimStatusStorage := pvcetcd.NewREST(c.DatabaseStorage) persistentVolumeClaimStorage, persistentVolumeClaimStatusStorage := pvcetcd.NewREST(dbClient("persistentVolumeClaims"))
namespaceStorage, namespaceStatusStorage, namespaceFinalizeStorage := namespaceetcd.NewREST(c.DatabaseStorage) namespaceStorage, namespaceStatusStorage, namespaceFinalizeStorage := namespaceetcd.NewREST(dbClient("namespaces"))
m.namespaceRegistry = namespace.NewRegistry(namespaceStorage) m.namespaceRegistry = namespace.NewRegistry(namespaceStorage)
endpointsStorage := endpointsetcd.NewREST(c.DatabaseStorage, c.EnableWatchCache) endpointsStorage := endpointsetcd.NewREST(dbClient("endpoints"), c.EnableWatchCache)
m.endpointRegistry = endpoint.NewRegistry(endpointsStorage) m.endpointRegistry = endpoint.NewRegistry(endpointsStorage)
nodeStorage, nodeStatusStorage := nodeetcd.NewREST(c.DatabaseStorage, c.EnableWatchCache, c.KubeletClient) nodeStorage, nodeStatusStorage := nodeetcd.NewREST(dbClient("nodes"), c.EnableWatchCache, c.KubeletClient)
m.nodeRegistry = node.NewRegistry(nodeStorage) m.nodeRegistry = node.NewRegistry(nodeStorage)
serviceStorage := serviceetcd.NewREST(c.DatabaseStorage) serviceStorage := serviceetcd.NewREST(dbClient("services"))
m.serviceRegistry = service.NewRegistry(serviceStorage) m.serviceRegistry = service.NewRegistry(serviceStorage)
var serviceClusterIPRegistry service.RangeRegistry var serviceClusterIPRegistry service.RangeRegistry
serviceClusterIPAllocator := ipallocator.NewAllocatorCIDRRange(m.serviceClusterIPRange, func(max int, rangeSpec string) allocator.Interface { serviceClusterIPAllocator := ipallocator.NewAllocatorCIDRRange(m.serviceClusterIPRange, func(max int, rangeSpec string) allocator.Interface {
mem := allocator.NewAllocationMap(max, rangeSpec) mem := allocator.NewAllocationMap(max, rangeSpec)
etcd := etcdallocator.NewEtcd(mem, "/ranges/serviceips", "serviceipallocation", c.DatabaseStorage) etcd := etcdallocator.NewEtcd(mem, "/ranges/serviceips", "serviceipallocation", dbClient("services"))
serviceClusterIPRegistry = etcd serviceClusterIPRegistry = etcd
return etcd return etcd
}) })
@ -472,13 +541,13 @@ func (m *Master) init(c *Config) {
var serviceNodePortRegistry service.RangeRegistry var serviceNodePortRegistry service.RangeRegistry
serviceNodePortAllocator := portallocator.NewPortAllocatorCustom(m.serviceNodePortRange, func(max int, rangeSpec string) allocator.Interface { serviceNodePortAllocator := portallocator.NewPortAllocatorCustom(m.serviceNodePortRange, func(max int, rangeSpec string) allocator.Interface {
mem := allocator.NewAllocationMap(max, rangeSpec) mem := allocator.NewAllocationMap(max, rangeSpec)
etcd := etcdallocator.NewEtcd(mem, "/ranges/servicenodeports", "servicenodeportallocation", c.DatabaseStorage) etcd := etcdallocator.NewEtcd(mem, "/ranges/servicenodeports", "servicenodeportallocation", dbClient("services"))
serviceNodePortRegistry = etcd serviceNodePortRegistry = etcd
return etcd return etcd
}) })
m.serviceNodePortAllocator = serviceNodePortRegistry m.serviceNodePortAllocator = serviceNodePortRegistry
controllerStorage := controlleretcd.NewREST(c.DatabaseStorage) controllerStorage := controlleretcd.NewREST(dbClient("replicationControllers"))
// TODO: Factor out the core API registration // TODO: Factor out the core API registration
m.storage = map[string]rest.Storage{ m.storage = map[string]rest.Storage{
@ -579,7 +648,7 @@ func (m *Master) init(c *Config) {
// allGroups records all supported groups at /apis // allGroups records all supported groups at /apis
allGroups := []api.APIGroup{} allGroups := []api.APIGroup{}
if m.exp { if m.exp {
m.thirdPartyStorage = c.ExpDatabaseStorage m.thirdPartyStorage = c.StorageDestinations.APIGroups["experimental"].Default
m.thirdPartyResources = map[string]*thirdpartyresourcedataetcd.REST{} m.thirdPartyResources = map[string]*thirdpartyresourcedataetcd.REST{}
expVersion := m.experimental(c) expVersion := m.experimental(c)
@ -752,7 +821,8 @@ func (m *Master) getServersToValidate(c *Config) map[string]apiserver.Server {
"controller-manager": {Addr: "127.0.0.1", Port: ports.ControllerManagerPort, Path: "/healthz"}, "controller-manager": {Addr: "127.0.0.1", Port: ports.ControllerManagerPort, Path: "/healthz"},
"scheduler": {Addr: "127.0.0.1", Port: ports.SchedulerPort, Path: "/healthz"}, "scheduler": {Addr: "127.0.0.1", Port: ports.SchedulerPort, Path: "/healthz"},
} }
for ix, machine := range c.DatabaseStorage.Backends() {
for ix, machine := range c.StorageDestinations.backends() {
etcdUrl, err := url.Parse(machine) etcdUrl, err := url.Parse(machine)
if err != nil { if err != nil {
glog.Errorf("Failed to parse etcd url for validation: %v", err) glog.Errorf("Failed to parse etcd url for validation: %v", err)
@ -962,13 +1032,16 @@ func (m *Master) thirdpartyapi(group, kind, version string) *apiserver.APIGroupV
// experimental returns the resources and codec for the experimental api // experimental returns the resources and codec for the experimental api
func (m *Master) experimental(c *Config) *apiserver.APIGroupVersion { func (m *Master) experimental(c *Config) *apiserver.APIGroupVersion {
controllerStorage := expcontrolleretcd.NewStorage(c.DatabaseStorage) controllerStorage := expcontrolleretcd.NewStorage(c.StorageDestinations.get("", "replicationControllers"))
autoscalerStorage := horizontalpodautoscaleretcd.NewREST(c.ExpDatabaseStorage) dbClient := func(resource string) storage.Interface {
thirdPartyResourceStorage := thirdpartyresourceetcd.NewREST(c.ExpDatabaseStorage) return c.StorageDestinations.get("experimental", resource)
daemonSetStorage, daemonSetStatusStorage := daemonetcd.NewREST(c.ExpDatabaseStorage) }
deploymentStorage := deploymentetcd.NewStorage(c.ExpDatabaseStorage) autoscalerStorage := horizontalpodautoscaleretcd.NewREST(dbClient("horizonalpodautoscalers"))
jobStorage, jobStatusStorage := jobetcd.NewREST(c.ExpDatabaseStorage) thirdPartyResourceStorage := thirdpartyresourceetcd.NewREST(dbClient("thirdpartyresources"))
ingressStorage := ingressetcd.NewREST(c.ExpDatabaseStorage) daemonSetStorage, daemonSetStatusStorage := daemonetcd.NewREST(dbClient("daemonsets"))
deploymentStorage := deploymentetcd.NewStorage(dbClient("deployments"))
jobStorage, jobStatusStorage := jobetcd.NewREST(dbClient("jobs"))
ingressStorage := ingressetcd.NewREST(dbClient("ingress"))
thirdPartyControl := ThirdPartyController{ thirdPartyControl := ThirdPartyController{
master: m, master: m,

View File

@ -65,9 +65,11 @@ func setUp(t *testing.T) (Master, Config, *assert.Assertions) {
fakeClient := tools.NewFakeEtcdClient(t) fakeClient := tools.NewFakeEtcdClient(t)
fakeClient.Machines = []string{"http://machine1:4001", "http://machine2", "http://machine3:4003"} fakeClient.Machines = []string{"http://machine1:4001", "http://machine2", "http://machine3:4003"}
storageVersions := make(map[string]string) storageVersions := make(map[string]string)
config.DatabaseStorage = etcdstorage.NewEtcdStorage(fakeClient, testapi.Default.Codec(), etcdtest.PathPrefix()) storageDestinations := NewStorageDestinations()
storageDestinations.AddAPIGroup("", etcdstorage.NewEtcdStorage(fakeClient, testapi.Default.Codec(), etcdtest.PathPrefix()))
storageDestinations.AddAPIGroup("experimental", etcdstorage.NewEtcdStorage(fakeClient, testapi.Experimental.Codec(), etcdtest.PathPrefix()))
config.StorageDestinations = storageDestinations
storageVersions[""] = testapi.Default.Version() storageVersions[""] = testapi.Default.Version()
config.ExpDatabaseStorage = etcdstorage.NewEtcdStorage(fakeClient, testapi.Experimental.Codec(), etcdtest.PathPrefix())
storageVersions["experimental"] = testapi.Experimental.GroupAndVersion() storageVersions["experimental"] = testapi.Experimental.GroupAndVersion()
config.StorageVersions = storageVersions config.StorageVersions = storageVersions
master.nodeRegistry = registrytest.NewNodeRegistry([]string{"node1", "node2"}, api.NodeResources{}) master.nodeRegistry = registrytest.NewNodeRegistry([]string{"node1", "node2"}, api.NodeResources{})

View File

@ -390,6 +390,8 @@ func TestAuthModeAlwaysAllow(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }
storageDestinations := master.NewStorageDestinations()
storageDestinations.AddAPIGroup("", etcdStorage)
var m *master.Master var m *master.Master
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
m.Handler.ServeHTTP(w, req) m.Handler.ServeHTTP(w, req)
@ -397,7 +399,7 @@ func TestAuthModeAlwaysAllow(t *testing.T) {
defer s.Close() defer s.Close()
m = master.New(&master.Config{ m = master.New(&master.Config{
DatabaseStorage: etcdStorage, StorageDestinations: storageDestinations,
KubeletClient: client.FakeKubeletClient{}, KubeletClient: client.FakeKubeletClient{},
EnableCoreControllers: true, EnableCoreControllers: true,
EnableLogsSupport: false, EnableLogsSupport: false,
@ -506,6 +508,8 @@ func TestAuthModeAlwaysDeny(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }
storageDestinations := master.NewStorageDestinations()
storageDestinations.AddAPIGroup("", etcdStorage)
var m *master.Master var m *master.Master
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
@ -514,7 +518,7 @@ func TestAuthModeAlwaysDeny(t *testing.T) {
defer s.Close() defer s.Close()
m = master.New(&master.Config{ m = master.New(&master.Config{
DatabaseStorage: etcdStorage, StorageDestinations: storageDestinations,
KubeletClient: client.FakeKubeletClient{}, KubeletClient: client.FakeKubeletClient{},
EnableCoreControllers: true, EnableCoreControllers: true,
EnableLogsSupport: false, EnableLogsSupport: false,
@ -574,6 +578,8 @@ func TestAliceNotForbiddenOrUnauthorized(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }
storageDestinations := master.NewStorageDestinations()
storageDestinations.AddAPIGroup("", etcdStorage)
var m *master.Master var m *master.Master
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
@ -582,7 +588,7 @@ func TestAliceNotForbiddenOrUnauthorized(t *testing.T) {
defer s.Close() defer s.Close()
m = master.New(&master.Config{ m = master.New(&master.Config{
DatabaseStorage: etcdStorage, StorageDestinations: storageDestinations,
KubeletClient: client.FakeKubeletClient{}, KubeletClient: client.FakeKubeletClient{},
EnableCoreControllers: true, EnableCoreControllers: true,
EnableLogsSupport: false, EnableLogsSupport: false,
@ -662,6 +668,8 @@ func TestBobIsForbidden(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }
storageDestinations := master.NewStorageDestinations()
storageDestinations.AddAPIGroup("", etcdStorage)
var m *master.Master var m *master.Master
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
@ -670,7 +678,7 @@ func TestBobIsForbidden(t *testing.T) {
defer s.Close() defer s.Close()
m = master.New(&master.Config{ m = master.New(&master.Config{
DatabaseStorage: etcdStorage, StorageDestinations: storageDestinations,
KubeletClient: client.FakeKubeletClient{}, KubeletClient: client.FakeKubeletClient{},
EnableCoreControllers: true, EnableCoreControllers: true,
EnableLogsSupport: false, EnableLogsSupport: false,
@ -724,6 +732,8 @@ func TestUnknownUserIsUnauthorized(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }
storageDestinations := master.NewStorageDestinations()
storageDestinations.AddAPIGroup("", etcdStorage)
var m *master.Master var m *master.Master
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
@ -732,7 +742,7 @@ func TestUnknownUserIsUnauthorized(t *testing.T) {
defer s.Close() defer s.Close()
m = master.New(&master.Config{ m = master.New(&master.Config{
DatabaseStorage: etcdStorage, StorageDestinations: storageDestinations,
KubeletClient: client.FakeKubeletClient{}, KubeletClient: client.FakeKubeletClient{},
EnableCoreControllers: true, EnableCoreControllers: true,
EnableLogsSupport: false, EnableLogsSupport: false,
@ -809,6 +819,8 @@ func TestAuthorizationAttributeDetermination(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }
storageDestinations := master.NewStorageDestinations()
storageDestinations.AddAPIGroup("", etcdStorage)
trackingAuthorizer := &trackingAuthorizer{} trackingAuthorizer := &trackingAuthorizer{}
@ -819,7 +831,7 @@ func TestAuthorizationAttributeDetermination(t *testing.T) {
defer s.Close() defer s.Close()
m = master.New(&master.Config{ m = master.New(&master.Config{
DatabaseStorage: etcdStorage, StorageDestinations: storageDestinations,
KubeletClient: client.FakeKubeletClient{}, KubeletClient: client.FakeKubeletClient{},
EnableCoreControllers: true, EnableCoreControllers: true,
EnableLogsSupport: false, EnableLogsSupport: false,
@ -890,6 +902,8 @@ func TestNamespaceAuthorization(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }
storageDestinations := master.NewStorageDestinations()
storageDestinations.AddAPIGroup("", etcdStorage)
a := newAuthorizerWithContents(t, `{"namespace": "foo"} a := newAuthorizerWithContents(t, `{"namespace": "foo"}
`) `)
@ -901,7 +915,7 @@ func TestNamespaceAuthorization(t *testing.T) {
defer s.Close() defer s.Close()
m = master.New(&master.Config{ m = master.New(&master.Config{
DatabaseStorage: etcdStorage, StorageDestinations: storageDestinations,
KubeletClient: client.FakeKubeletClient{}, KubeletClient: client.FakeKubeletClient{},
EnableCoreControllers: true, EnableCoreControllers: true,
EnableLogsSupport: false, EnableLogsSupport: false,
@ -1006,6 +1020,8 @@ func TestKindAuthorization(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }
storageDestinations := master.NewStorageDestinations()
storageDestinations.AddAPIGroup("", etcdStorage)
a := newAuthorizerWithContents(t, `{"resource": "services"} a := newAuthorizerWithContents(t, `{"resource": "services"}
`) `)
@ -1017,7 +1033,7 @@ func TestKindAuthorization(t *testing.T) {
defer s.Close() defer s.Close()
m = master.New(&master.Config{ m = master.New(&master.Config{
DatabaseStorage: etcdStorage, StorageDestinations: storageDestinations,
KubeletClient: client.FakeKubeletClient{}, KubeletClient: client.FakeKubeletClient{},
EnableCoreControllers: true, EnableCoreControllers: true,
EnableLogsSupport: false, EnableLogsSupport: false,
@ -1110,6 +1126,8 @@ func TestReadOnlyAuthorization(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }
storageDestinations := master.NewStorageDestinations()
storageDestinations.AddAPIGroup("", etcdStorage)
a := newAuthorizerWithContents(t, `{"readonly": true}`) a := newAuthorizerWithContents(t, `{"readonly": true}`)
@ -1120,7 +1138,7 @@ func TestReadOnlyAuthorization(t *testing.T) {
defer s.Close() defer s.Close()
m = master.New(&master.Config{ m = master.New(&master.Config{
DatabaseStorage: etcdStorage, StorageDestinations: storageDestinations,
KubeletClient: client.FakeKubeletClient{}, KubeletClient: client.FakeKubeletClient{},
EnableCoreControllers: true, EnableCoreControllers: true,
EnableLogsSupport: false, EnableLogsSupport: false,

View File

@ -37,7 +37,6 @@ import (
"k8s.io/kubernetes/pkg/kubectl" "k8s.io/kubernetes/pkg/kubectl"
"k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/master" "k8s.io/kubernetes/pkg/master"
"k8s.io/kubernetes/pkg/storage"
"k8s.io/kubernetes/pkg/tools/etcdtest" "k8s.io/kubernetes/pkg/tools/etcdtest"
"k8s.io/kubernetes/plugin/pkg/admission/admit" "k8s.io/kubernetes/plugin/pkg/admission/admit"
) )
@ -72,8 +71,6 @@ type MasterComponents struct {
rcStopCh chan struct{} rcStopCh chan struct{}
// Used to stop master components individually, and via MasterComponents.Stop // Used to stop master components individually, and via MasterComponents.Stop
once sync.Once once sync.Once
// Kubernetes etcd storage, has embedded etcd client
EtcdStorage storage.Interface
} }
// Config is a struct of configuration directives for NewMasterComponents. // Config is a struct of configuration directives for NewMasterComponents.
@ -92,7 +89,7 @@ type Config struct {
// NewMasterComponents creates, initializes and starts master components based on the given config. // NewMasterComponents creates, initializes and starts master components based on the given config.
func NewMasterComponents(c *Config) *MasterComponents { func NewMasterComponents(c *Config) *MasterComponents {
m, s, e := startMasterOrDie(c.MasterConfig) m, s := startMasterOrDie(c.MasterConfig)
// TODO: Allow callers to pipe through a different master url and create a client/start components using it. // TODO: Allow callers to pipe through a different master url and create a client/start components using it.
glog.Infof("Master %+v", s.URL) glog.Infof("Master %+v", s.URL)
if c.DeleteEtcdKeys { if c.DeleteEtcdKeys {
@ -114,24 +111,21 @@ func NewMasterComponents(c *Config) *MasterComponents {
RestClient: restClient, RestClient: restClient,
ControllerManager: controllerManager, ControllerManager: controllerManager,
rcStopCh: rcStopCh, rcStopCh: rcStopCh,
EtcdStorage: e,
once: once, once: once,
} }
} }
// startMasterOrDie starts a kubernetes master and an httpserver to handle api requests // startMasterOrDie starts a kubernetes master and an httpserver to handle api requests
func startMasterOrDie(masterConfig *master.Config) (*master.Master, *httptest.Server, storage.Interface) { func startMasterOrDie(masterConfig *master.Config) (*master.Master, *httptest.Server) {
var m *master.Master var m *master.Master
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
m.Handler.ServeHTTP(w, req) m.Handler.ServeHTTP(w, req)
})) }))
var etcdStorage storage.Interface
var err error
if masterConfig == nil { if masterConfig == nil {
etcdClient := NewEtcdClient() etcdClient := NewEtcdClient()
storageVersions := make(map[string]string) storageVersions := make(map[string]string)
etcdStorage, err = master.NewEtcdStorage(etcdClient, latest.GroupOrDie("").InterfacesFor, latest.GroupOrDie("").GroupVersion, etcdtest.PathPrefix()) etcdStorage, err := master.NewEtcdStorage(etcdClient, latest.GroupOrDie("").InterfacesFor, latest.GroupOrDie("").GroupVersion, etcdtest.PathPrefix())
storageVersions[""] = latest.GroupOrDie("").GroupVersion storageVersions[""] = latest.GroupOrDie("").GroupVersion
if err != nil { if err != nil {
glog.Fatalf("Failed to create etcd storage for master %v", err) glog.Fatalf("Failed to create etcd storage for master %v", err)
@ -141,10 +135,12 @@ func startMasterOrDie(masterConfig *master.Config) (*master.Master, *httptest.Se
if err != nil { if err != nil {
glog.Fatalf("Failed to create etcd storage for master %v", err) glog.Fatalf("Failed to create etcd storage for master %v", err)
} }
storageDestinations := master.NewStorageDestinations()
storageDestinations.AddAPIGroup("", etcdStorage)
storageDestinations.AddAPIGroup("experimental", expEtcdStorage)
masterConfig = &master.Config{ masterConfig = &master.Config{
DatabaseStorage: etcdStorage, StorageDestinations: storageDestinations,
ExpDatabaseStorage: expEtcdStorage,
StorageVersions: storageVersions, StorageVersions: storageVersions,
KubeletClient: client.FakeKubeletClient{}, KubeletClient: client.FakeKubeletClient{},
EnableExp: true, EnableExp: true,
@ -157,11 +153,9 @@ func startMasterOrDie(masterConfig *master.Config) (*master.Master, *httptest.Se
Authorizer: apiserver.NewAlwaysAllowAuthorizer(), Authorizer: apiserver.NewAlwaysAllowAuthorizer(),
AdmissionControl: admit.NewAlwaysAdmit(), AdmissionControl: admit.NewAlwaysAdmit(),
} }
} else {
etcdStorage = masterConfig.DatabaseStorage
} }
m = master.New(masterConfig) m = master.New(masterConfig)
return m, s, etcdStorage return m, s
} }
func (m *MasterComponents) stopRCManager() { func (m *MasterComponents) stopRCManager() {
@ -285,20 +279,22 @@ func RunAMaster(t *testing.T) (*master.Master, *httptest.Server) {
if err != nil { if err != nil {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }
storageDestinations := master.NewStorageDestinations()
storageDestinations.AddAPIGroup("", etcdStorage)
storageDestinations.AddAPIGroup("experimental", expEtcdStorage)
m := master.New(&master.Config{ m := master.New(&master.Config{
DatabaseStorage: etcdStorage, StorageDestinations: storageDestinations,
ExpDatabaseStorage: expEtcdStorage, KubeletClient: client.FakeKubeletClient{},
KubeletClient: client.FakeKubeletClient{}, EnableLogsSupport: false,
EnableLogsSupport: false, EnableProfiling: true,
EnableProfiling: true, EnableUISupport: false,
EnableUISupport: false, APIPrefix: "/api",
APIPrefix: "/api", APIGroupPrefix: "/apis",
APIGroupPrefix: "/apis", EnableExp: true,
EnableExp: true, Authorizer: apiserver.NewAlwaysAllowAuthorizer(),
Authorizer: apiserver.NewAlwaysAllowAuthorizer(), AdmissionControl: admit.NewAlwaysAdmit(),
AdmissionControl: admit.NewAlwaysAdmit(), StorageVersions: storageVersions,
StorageVersions: storageVersions,
}) })
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {

View File

@ -58,6 +58,8 @@ func TestUnschedulableNodes(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("Couldn't create etcd storage: %v", err) t.Fatalf("Couldn't create etcd storage: %v", err)
} }
storageDestinations := master.NewStorageDestinations()
storageDestinations.AddAPIGroup("", etcdStorage)
framework.DeleteAllEtcdKeys() framework.DeleteAllEtcdKeys()
var m *master.Master var m *master.Master
@ -67,7 +69,7 @@ func TestUnschedulableNodes(t *testing.T) {
defer s.Close() defer s.Close()
m = master.New(&master.Config{ m = master.New(&master.Config{
DatabaseStorage: etcdStorage, StorageDestinations: storageDestinations,
KubeletClient: client.FakeKubeletClient{}, KubeletClient: client.FakeKubeletClient{},
EnableCoreControllers: true, EnableCoreControllers: true,
EnableLogsSupport: false, EnableLogsSupport: false,

View File

@ -51,6 +51,8 @@ func TestSecrets(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }
storageDestinations := master.NewStorageDestinations()
storageDestinations.AddAPIGroup("", etcdStorage)
var m *master.Master var m *master.Master
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
@ -59,7 +61,7 @@ func TestSecrets(t *testing.T) {
defer s.Close() defer s.Close()
m = master.New(&master.Config{ m = master.New(&master.Config{
DatabaseStorage: etcdStorage, StorageDestinations: storageDestinations,
KubeletClient: client.FakeKubeletClient{}, KubeletClient: client.FakeKubeletClient{},
EnableCoreControllers: true, EnableCoreControllers: true,
EnableLogsSupport: false, EnableLogsSupport: false,

View File

@ -345,6 +345,8 @@ func startServiceAccountTestServer(t *testing.T) (*client.Client, client.Config,
if err != nil { if err != nil {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }
storageDestinations := master.NewStorageDestinations()
storageDestinations.AddAPIGroup("", etcdStorage)
// Listener // Listener
var m *master.Master var m *master.Master
@ -411,16 +413,16 @@ func startServiceAccountTestServer(t *testing.T) (*client.Client, client.Config,
// Create a master and install handlers into mux. // Create a master and install handlers into mux.
m = master.New(&master.Config{ m = master.New(&master.Config{
DatabaseStorage: etcdStorage, StorageDestinations: storageDestinations,
KubeletClient: client.FakeKubeletClient{}, KubeletClient: client.FakeKubeletClient{},
EnableLogsSupport: false, EnableLogsSupport: false,
EnableUISupport: false, EnableUISupport: false,
EnableIndex: true, EnableIndex: true,
APIPrefix: "/api", APIPrefix: "/api",
Authenticator: authenticator, Authenticator: authenticator,
Authorizer: authorizer, Authorizer: authorizer,
AdmissionControl: serviceAccountAdmission, AdmissionControl: serviceAccountAdmission,
StorageVersions: map[string]string{"": testapi.Default.Version()}, StorageVersions: map[string]string{"": testapi.Default.Version()},
}) })
// Start the service account and service account token controllers // Start the service account and service account token controllers

View File

@ -70,9 +70,11 @@ func runAMaster(t *testing.T) (*master.Master, *httptest.Server) {
if err != nil { if err != nil {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }
storageDestinations := master.NewStorageDestinations()
storageDestinations.AddAPIGroup("", etcdStorage)
m := master.New(&master.Config{ m := master.New(&master.Config{
DatabaseStorage: etcdStorage, StorageDestinations: storageDestinations,
KubeletClient: client.FakeKubeletClient{}, KubeletClient: client.FakeKubeletClient{},
EnableCoreControllers: true, EnableCoreControllers: true,
EnableLogsSupport: false, EnableLogsSupport: false,