From 021cf6480857086135faf36ee8e1078f227a7cda Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Tue, 8 Jul 2014 12:55:11 -0400 Subject: [PATCH] Change proxy config to reuse util/config Splits endpoint and service configuration into their own objects. Also makes the endpoint and service configuration tests correct - there was a race condition previously that meant tests were passing but not checking correct code. --- cmd/proxy/proxy.go | 16 +- pkg/proxy/config/config.go | 406 ++++++++++++++------------------ pkg/proxy/config/config_test.go | 171 ++++++++------ pkg/proxy/config/etcd.go | 48 ++-- pkg/proxy/config/file.go | 12 +- 5 files changed, 315 insertions(+), 338 deletions(-) diff --git a/cmd/proxy/proxy.go b/cmd/proxy/proxy.go index b38f16bf36..35b8f4d38b 100644 --- a/cmd/proxy/proxy.go +++ b/cmd/proxy/proxy.go @@ -41,27 +41,27 @@ func main() { glog.Infof("Using configuration file %s and etcd_servers %s", *configFile, *etcdServers) - proxyConfig := config.NewServiceConfig() + serviceConfig := config.NewServiceConfig() + endpointsConfig := config.NewEndpointsConfig() // Create a configuration source that handles configuration from etcd. etcdClient := etcd.NewClient([]string{*etcdServers}) config.NewConfigSourceEtcd(etcdClient, - proxyConfig.GetServiceConfigurationChannel("etcd"), - proxyConfig.GetEndpointsConfigurationChannel("etcd")) + serviceConfig.Channel("etcd"), + endpointsConfig.Channel("etcd")) // And create a configuration source that reads from a local file config.NewConfigSourceFile(*configFile, - proxyConfig.GetServiceConfigurationChannel("file"), - proxyConfig.GetEndpointsConfigurationChannel("file")) + serviceConfig.Channel("file"), + endpointsConfig.Channel("file")) loadBalancer := proxy.NewLoadBalancerRR() proxier := proxy.NewProxier(loadBalancer) // Wire proxier to handle changes to services - proxyConfig.RegisterServiceHandler(proxier) + serviceConfig.RegisterHandler(proxier) // And wire loadBalancer to handle changes to endpoints to services - proxyConfig.RegisterEndpointsHandler(loadBalancer) + endpointsConfig.RegisterHandler(loadBalancer) // Just loop forever for now... select {} - } diff --git a/pkg/proxy/config/config.go b/pkg/proxy/config/config.go index 7f17d71d6f..5fab579b6c 100644 --- a/pkg/proxy/config/config.go +++ b/pkg/proxy/config/config.go @@ -18,9 +18,9 @@ package config import ( "sync" - "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util/config" "github.com/golang/glog" ) @@ -69,260 +69,200 @@ type EndpointsConfigHandler interface { OnUpdate(endpoints []api.Endpoints) } -// ServiceConfig tracks a set of service configurations and their endpoint configurations. -// It accepts "set", "add" and "remove" operations of services and endpoints via channels, and invokes registered handlers on change. +// EndpointsConfig tracks a set of endpoints configurations. +// It accepts "set", "add" and "remove" operations of endpoints via channels, and invokes registered handlers on change. +type EndpointsConfig struct { + mux *config.Mux + watcher *config.Watcher + store *endpointsStore +} + +// NewEndpointConfig creates a new EndpointConfig. +// It immediately runs the created EndpointConfig. +func NewEndpointsConfig() *EndpointsConfig { + updates := make(chan struct{}) + store := &endpointsStore{updates: updates, endpoints: make(map[string]map[string]api.Endpoints)} + mux := config.NewMux(store) + watcher := config.NewWatcher() + go watchForUpdates(watcher, store, updates) + return &EndpointsConfig{mux, watcher, store} +} + +func (c *EndpointsConfig) RegisterHandler(handler EndpointsConfigHandler) { + c.watcher.Add(config.ListenerFunc(func(instance interface{}) { + handler.OnUpdate(instance.([]api.Endpoints)) + })) +} + +func (c *EndpointsConfig) Channel(source string) chan EndpointsUpdate { + ch := c.mux.Channel(source) + endpointsCh := make(chan EndpointsUpdate) + go func() { + for update := range endpointsCh { + ch <- update + } + close(ch) + }() + return endpointsCh +} + +func (c *EndpointsConfig) Config() map[string]map[string]api.Endpoints { + return c.store.MergedState().(map[string]map[string]api.Endpoints) +} + +type endpointsStore struct { + endpointLock sync.RWMutex + endpoints map[string]map[string]api.Endpoints + updates chan<- struct{} +} + +func (s *endpointsStore) Merge(source string, change interface{}) error { + s.endpointLock.Lock() + endpoints := s.endpoints[source] + if endpoints == nil { + endpoints = make(map[string]api.Endpoints) + } + update := change.(EndpointsUpdate) + switch update.Op { + case ADD: + glog.Infof("Adding new endpoint from source %s : %v", source, update.Endpoints) + for _, value := range update.Endpoints { + endpoints[value.Name] = value + } + case REMOVE: + glog.Infof("Removing an endpoint %v", update) + for _, value := range update.Endpoints { + delete(endpoints, value.Name) + } + case SET: + glog.Infof("Setting endpoints %v", update) + // Clear the old map entries by just creating a new map + endpoints = make(map[string]api.Endpoints) + for _, value := range update.Endpoints { + endpoints[value.Name] = value + } + default: + glog.Infof("Received invalid update type: %v", update) + } + s.endpoints[source] = endpoints + s.endpointLock.Unlock() + if s.updates != nil { + s.updates <- struct{}{} + } + return nil +} + +func (s *endpointsStore) MergedState() interface{} { + s.endpointLock.RLock() + defer s.endpointLock.RUnlock() + endpoints := make([]api.Endpoints, 0) + for _, sourceEndpoints := range s.endpoints { + for _, value := range sourceEndpoints { + endpoints = append(endpoints, value) + } + } + return endpoints +} + +// ServiceConfig tracks a set of service configurations. +// It accepts "set", "add" and "remove" operations of services via channels, and invokes registered handlers on change. type ServiceConfig struct { - // Configuration sources and their lock. - configSourceLock sync.RWMutex - serviceConfigSources map[string]chan ServiceUpdate - endpointsConfigSources map[string]chan EndpointsUpdate - - // Handlers for changes to services and endpoints and their lock. - handlerLock sync.RWMutex - serviceHandlers []ServiceConfigHandler - endpointHandlers []EndpointsConfigHandler - - // Last known configuration for union of the sources and the locks. Map goes - // from each source to array of services/endpoints that have been configured - // through that channel. - configLock sync.RWMutex - serviceConfig map[string]map[string]api.Service - endpointConfig map[string]map[string]api.Endpoints - - // Channel that service configuration source listeners use to signal of new - // configurations. - // Value written is the source of the change. - serviceNotifyChannel chan string - - // Channel that endpoint configuration source listeners use to signal of new - // configurations. - // Value written is the source of the change. - endpointsNotifyChannel chan string + mux *config.Mux + watcher *config.Watcher + store *serviceStore } // NewServiceConfig creates a new ServiceConfig. // It immediately runs the created ServiceConfig. func NewServiceConfig() *ServiceConfig { - config := &ServiceConfig{ - serviceConfigSources: make(map[string]chan ServiceUpdate), - endpointsConfigSources: make(map[string]chan EndpointsUpdate), - serviceHandlers: make([]ServiceConfigHandler, 10), - endpointHandlers: make([]EndpointsConfigHandler, 10), - serviceConfig: make(map[string]map[string]api.Service), - endpointConfig: make(map[string]map[string]api.Endpoints), - serviceNotifyChannel: make(chan string), - endpointsNotifyChannel: make(chan string), - } - go config.Run() - return config + updates := make(chan struct{}) + store := &serviceStore{updates: updates, services: make(map[string]map[string]api.Service)} + mux := config.NewMux(store) + watcher := config.NewWatcher() + go watchForUpdates(watcher, store, updates) + return &ServiceConfig{mux, watcher, store} } -// Run begins a loop to accept new service configurations and new endpoint configurations. -// It never returns. -func (impl *ServiceConfig) Run() { - glog.Infof("Starting the config Run loop") - for { - select { - case source := <-impl.serviceNotifyChannel: - glog.Infof("Got new service configuration from source %s", source) - impl.notifyServiceUpdate() - case source := <-impl.endpointsNotifyChannel: - glog.Infof("Got new endpoint configuration from source %s", source) - impl.notifyEndpointsUpdate() - case <-time.After(1 * time.Second): +func (c *ServiceConfig) RegisterHandler(handler ServiceConfigHandler) { + c.watcher.Add(config.ListenerFunc(func(instance interface{}) { + handler.OnUpdate(instance.([]api.Service)) + })) +} + +func (c *ServiceConfig) Channel(source string) chan ServiceUpdate { + ch := c.mux.Channel(source) + serviceCh := make(chan ServiceUpdate) + go func() { + for update := range serviceCh { + ch <- update } - } + close(ch) + }() + return serviceCh } -// serviceChannelListener begins a loop to handle incoming ServiceUpdate notifications from the channel. -// It never returns. -func (impl *ServiceConfig) serviceChannelListener(source string, listenChannel chan ServiceUpdate) { - // Represents the current services configuration for this channel. - serviceMap := make(map[string]api.Service) - for { - select { - case update := <-listenChannel: - impl.configLock.Lock() - switch update.Op { - case ADD: - glog.Infof("Adding new service from source %s : %v", source, update.Services) - for _, value := range update.Services { - serviceMap[value.ID] = value - } - case REMOVE: - glog.Infof("Removing a service %v", update) - for _, value := range update.Services { - delete(serviceMap, value.ID) - } - case SET: - glog.Infof("Setting services %v", update) - // Clear the old map entries by just creating a new map - serviceMap = make(map[string]api.Service) - for _, value := range update.Services { - serviceMap[value.ID] = value - } - default: - glog.Infof("Received invalid update type: %v", update) - continue - } - impl.serviceConfig[source] = serviceMap - impl.configLock.Unlock() - impl.serviceNotifyChannel <- source +func (c *ServiceConfig) Config() map[string]map[string]api.Service { + return c.store.MergedState().(map[string]map[string]api.Service) +} + +type serviceStore struct { + serviceLock sync.RWMutex + services map[string]map[string]api.Service + updates chan<- struct{} +} + +func (s *serviceStore) Merge(source string, change interface{}) error { + s.serviceLock.Lock() + services := s.services[source] + if services == nil { + services = make(map[string]api.Service) + } + update := change.(ServiceUpdate) + switch update.Op { + case ADD: + glog.Infof("Adding new service from source %s : %v", source, update.Services) + for _, value := range update.Services { + services[value.ID] = value } - } -} - -// endpointsChannelListener begins a loop to handle incoming EndpointsUpdate notifications from the channel. -// It never returns. -func (impl *ServiceConfig) endpointsChannelListener(source string, listenChannel chan EndpointsUpdate) { - endpointMap := make(map[string]api.Endpoints) - for { - select { - case update := <-listenChannel: - impl.configLock.Lock() - switch update.Op { - case ADD: - glog.Infof("Adding a new endpoint %v", update) - for _, value := range update.Endpoints { - endpointMap[value.Name] = value - } - case REMOVE: - glog.Infof("Removing an endpoint %v", update) - for _, value := range update.Endpoints { - delete(endpointMap, value.Name) - } - - case SET: - glog.Infof("Setting services %v", update) - // Clear the old map entries by just creating a new map - endpointMap = make(map[string]api.Endpoints) - for _, value := range update.Endpoints { - endpointMap[value.Name] = value - } - default: - glog.Infof("Received invalid update type: %v", update) - continue - } - impl.endpointConfig[source] = endpointMap - impl.configLock.Unlock() - impl.endpointsNotifyChannel <- source + case REMOVE: + glog.Infof("Removing a service %v", update) + for _, value := range update.Services { + delete(services, value.ID) } - - } -} - -// GetServiceConfigurationChannel returns a channel where a configuration source -// can send updates of new service configurations. Multiple calls with the same -// source will return the same channel. This allows change and state based sources -// to use the same channel. Difference source names however will be treated as a -// union. -func (impl *ServiceConfig) GetServiceConfigurationChannel(source string) chan ServiceUpdate { - if len(source) == 0 { - panic("GetServiceConfigurationChannel given an empty service name") - } - impl.configSourceLock.Lock() - defer impl.configSourceLock.Unlock() - channel, exists := impl.serviceConfigSources[source] - if exists { - return channel - } - newChannel := make(chan ServiceUpdate) - impl.serviceConfigSources[source] = newChannel - go impl.serviceChannelListener(source, newChannel) - return newChannel -} - -// GetEndpointsConfigurationChannel returns a channel where a configuration source -// can send updates of new endpoint configurations. Multiple calls with the same -// source will return the same channel. This allows change and state based sources -// to use the same channel. Difference source names however will be treated as a -// union. -func (impl *ServiceConfig) GetEndpointsConfigurationChannel(source string) chan EndpointsUpdate { - if len(source) == 0 { - panic("GetEndpointConfigurationChannel given an empty service name") - } - impl.configSourceLock.Lock() - defer impl.configSourceLock.Unlock() - channel, exists := impl.endpointsConfigSources[source] - if exists { - return channel - } - newChannel := make(chan EndpointsUpdate) - impl.endpointsConfigSources[source] = newChannel - go impl.endpointsChannelListener(source, newChannel) - return newChannel -} - -// RegisterServiceHandler registers the ServiceConfigHandler to receive updates of changes to services. -func (impl *ServiceConfig) RegisterServiceHandler(handler ServiceConfigHandler) { - impl.handlerLock.Lock() - defer impl.handlerLock.Unlock() - for i, h := range impl.serviceHandlers { - if h == nil { - impl.serviceHandlers[i] = handler - return + case SET: + glog.Infof("Setting services %v", update) + // Clear the old map entries by just creating a new map + services = make(map[string]api.Service) + for _, value := range update.Services { + services[value.ID] = value } + default: + glog.Infof("Received invalid update type: %v", update) } - // TODO(vaikas): Grow the array here instead of panic. - // In practice we are expecting there to be 1 handler anyways, - // so not a big deal for now - panic("Only up to 10 service handlers supported for now") + s.services[source] = services + s.serviceLock.Unlock() + if s.updates != nil { + s.updates <- struct{}{} + } + return nil } -// RegisterEndpointsHandler registers the EndpointsConfigHandler to receive updates of changes to services. -func (impl *ServiceConfig) RegisterEndpointsHandler(handler EndpointsConfigHandler) { - impl.handlerLock.Lock() - defer impl.handlerLock.Unlock() - for i, h := range impl.endpointHandlers { - if h == nil { - impl.endpointHandlers[i] = handler - return - } - } - // TODO(vaikas): Grow the array here instead of panic. - // In practice we are expecting there to be 1 handler anyways, - // so not a big deal for now - panic("Only up to 10 endpoint handlers supported for now") -} - -// notifyServiceUpdate calls the registered ServiceConfigHandlers with the current states of services. -func (impl *ServiceConfig) notifyServiceUpdate() { - services := []api.Service{} - impl.configLock.RLock() - for _, sourceServices := range impl.serviceConfig { +func (s *serviceStore) MergedState() interface{} { + s.serviceLock.RLock() + defer s.serviceLock.RUnlock() + services := make([]api.Service, 0) + for _, sourceServices := range s.services { for _, value := range sourceServices { services = append(services, value) } } - impl.configLock.RUnlock() - glog.Infof("Unified configuration %+v", services) - impl.handlerLock.RLock() - handlers := impl.serviceHandlers - impl.handlerLock.RUnlock() - for _, handler := range handlers { - if handler != nil { - handler.OnUpdate(services) - } - } + return services } -// notifyEndpointsUpdate calls the registered EndpointsConfigHandlers with the current states of endpoints. -func (impl *ServiceConfig) notifyEndpointsUpdate() { - endpoints := []api.Endpoints{} - impl.configLock.RLock() - for _, sourceEndpoints := range impl.endpointConfig { - for _, value := range sourceEndpoints { - endpoints = append(endpoints, value) - } - } - impl.configLock.RUnlock() - glog.Infof("Unified configuration %+v", endpoints) - impl.handlerLock.RLock() - handlers := impl.endpointHandlers - impl.handlerLock.RUnlock() - for _, handler := range handlers { - if handler != nil { - handler.OnUpdate(endpoints) - } +// watchForUpdates invokes watcher.Notify() with the latest version of an object +// when changes occur. +func watchForUpdates(watcher *config.Watcher, accessor config.Accessor, updates <-chan struct{}) { + for _ = range updates { + watcher.Notify(accessor.MergedState()) } } diff --git a/pkg/proxy/config/config_test.go b/pkg/proxy/config/config_test.go index 7074c55e17..755e02c19f 100644 --- a/pkg/proxy/config/config_test.go +++ b/pkg/proxy/config/config_test.go @@ -14,13 +14,16 @@ See the License for the specific language governing permissions and limitations under the License. */ -package config +package config_test import ( "reflect" + "sort" + "sync" "testing" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + . "github.com/GoogleCloudPlatform/kubernetes/pkg/proxy/config" ) const TomcatPort int = 8080 @@ -33,42 +36,82 @@ const MysqlName = "mysql" var MysqlEndpoints = map[string]string{"c0": "1.1.1.1:13306", "c3": "2.2.2.2:13306"} +type sortedServices []api.Service + +func (s sortedServices) Len() int { + return len(s) +} +func (s sortedServices) Swap(i, j int) { + s[i], s[j] = s[j], s[i] +} +func (s sortedServices) Less(i, j int) bool { + return s[i].JSONBase.ID < s[j].JSONBase.ID +} + type ServiceHandlerMock struct { services []api.Service + updated sync.WaitGroup } -func NewServiceHandlerMock() ServiceHandlerMock { - return ServiceHandlerMock{services: make([]api.Service, 0)} +func NewServiceHandlerMock() *ServiceHandlerMock { + return &ServiceHandlerMock{services: make([]api.Service, 0)} } -func (impl ServiceHandlerMock) OnUpdate(services []api.Service) { - impl.services = services +func (h *ServiceHandlerMock) OnUpdate(services []api.Service) { + sort.Sort(sortedServices(services)) + h.services = services + h.updated.Done() } -func (impl ServiceHandlerMock) ValidateServices(t *testing.T, expectedServices []api.Service) { - if reflect.DeepEqual(impl.services, expectedServices) { - t.Errorf("Services don't match %+v expected: %+v", impl.services, expectedServices) +func (h *ServiceHandlerMock) ValidateServices(t *testing.T, expectedServices []api.Service) { + h.updated.Wait() + if !reflect.DeepEqual(h.services, expectedServices) { + t.Errorf("Expected %#v, Got %#v", expectedServices, h.services) } } +func (h *ServiceHandlerMock) Wait(waits int) { + h.updated.Add(waits) +} + +type sortedEndpoints []api.Endpoints + +func (s sortedEndpoints) Len() int { + return len(s) +} +func (s sortedEndpoints) Swap(i, j int) { + s[i], s[j] = s[j], s[i] +} +func (s sortedEndpoints) Less(i, j int) bool { + return s[i].Name < s[j].Name +} + type EndpointsHandlerMock struct { endpoints []api.Endpoints + updated sync.WaitGroup } -func NewEndpointsHandlerMock() EndpointsHandlerMock { - return EndpointsHandlerMock{endpoints: make([]api.Endpoints, 0)} +func NewEndpointsHandlerMock() *EndpointsHandlerMock { + return &EndpointsHandlerMock{endpoints: make([]api.Endpoints, 0)} } -func (impl EndpointsHandlerMock) OnUpdate(endpoints []api.Endpoints) { - impl.endpoints = endpoints +func (h *EndpointsHandlerMock) OnUpdate(endpoints []api.Endpoints) { + sort.Sort(sortedEndpoints(endpoints)) + h.endpoints = endpoints + h.updated.Done() } -func (impl EndpointsHandlerMock) ValidateEndpoints(t *testing.T, expectedEndpoints []api.Endpoints) { - if reflect.DeepEqual(impl.endpoints, expectedEndpoints) { - t.Errorf("Endpoints don't match %+v", impl.endpoints, expectedEndpoints) +func (h *EndpointsHandlerMock) ValidateEndpoints(t *testing.T, expectedEndpoints []api.Endpoints) { + h.updated.Wait() + if !reflect.DeepEqual(h.endpoints, expectedEndpoints) { + t.Errorf("Expected %#v, Got %#v", expectedEndpoints, h.endpoints) } } +func (h *EndpointsHandlerMock) Wait(waits int) { + h.updated.Add(waits) +} + func CreateServiceUpdate(op Operation, services ...api.Service) ServiceUpdate { ret := ServiceUpdate{Op: op} ret.Services = make([]api.Service, len(services)) @@ -87,35 +130,12 @@ func CreateEndpointsUpdate(op Operation, endpoints ...api.Endpoints) EndpointsUp return ret } -func TestServiceConfigurationChannels(t *testing.T) { - config := NewServiceConfig() - channelOne := config.GetServiceConfigurationChannel("one") - if channelOne != config.GetServiceConfigurationChannel("one") { - t.Error("Didn't get the same service configuration channel back with the same name") - } - channelTwo := config.GetServiceConfigurationChannel("two") - if channelOne == channelTwo { - t.Error("Got back the same service configuration channel for different names") - } -} - -func TestEndpointConfigurationChannels(t *testing.T) { - config := NewServiceConfig() - channelOne := config.GetEndpointsConfigurationChannel("one") - if channelOne != config.GetEndpointsConfigurationChannel("one") { - t.Error("Didn't get the same endpoint configuration channel back with the same name") - } - channelTwo := config.GetEndpointsConfigurationChannel("two") - if channelOne == channelTwo { - t.Error("Got back the same endpoint configuration channel for different names") - } -} - func TestNewServiceAddedAndNotified(t *testing.T) { config := NewServiceConfig() - channel := config.GetServiceConfigurationChannel("one") + channel := config.Channel("one") handler := NewServiceHandlerMock() - config.RegisterServiceHandler(&handler) + handler.Wait(1) + config.RegisterHandler(handler) serviceUpdate := CreateServiceUpdate(ADD, api.Service{JSONBase: api.JSONBase{ID: "foo"}, Port: 10}) channel <- serviceUpdate handler.ValidateServices(t, serviceUpdate.Services) @@ -124,24 +144,28 @@ func TestNewServiceAddedAndNotified(t *testing.T) { func TestServiceAddedRemovedSetAndNotified(t *testing.T) { config := NewServiceConfig() - channel := config.GetServiceConfigurationChannel("one") + channel := config.Channel("one") handler := NewServiceHandlerMock() - config.RegisterServiceHandler(&handler) + config.RegisterHandler(handler) serviceUpdate := CreateServiceUpdate(ADD, api.Service{JSONBase: api.JSONBase{ID: "foo"}, Port: 10}) + handler.Wait(1) channel <- serviceUpdate handler.ValidateServices(t, serviceUpdate.Services) serviceUpdate2 := CreateServiceUpdate(ADD, api.Service{JSONBase: api.JSONBase{ID: "bar"}, Port: 20}) + handler.Wait(1) channel <- serviceUpdate2 - services := []api.Service{serviceUpdate.Services[0], serviceUpdate2.Services[0]} + services := []api.Service{serviceUpdate2.Services[0], serviceUpdate.Services[0]} handler.ValidateServices(t, services) serviceUpdate3 := CreateServiceUpdate(REMOVE, api.Service{JSONBase: api.JSONBase{ID: "foo"}}) + handler.Wait(1) channel <- serviceUpdate3 services = []api.Service{serviceUpdate2.Services[0]} handler.ValidateServices(t, services) serviceUpdate4 := CreateServiceUpdate(SET, api.Service{JSONBase: api.JSONBase{ID: "foobar"}, Port: 99}) + handler.Wait(1) channel <- serviceUpdate4 services = []api.Service{serviceUpdate4.Services[0]} handler.ValidateServices(t, services) @@ -149,89 +173,102 @@ func TestServiceAddedRemovedSetAndNotified(t *testing.T) { func TestNewMultipleSourcesServicesAddedAndNotified(t *testing.T) { config := NewServiceConfig() - channelOne := config.GetServiceConfigurationChannel("one") - channelTwo := config.GetServiceConfigurationChannel("two") + channelOne := config.Channel("one") + channelTwo := config.Channel("two") if channelOne == channelTwo { t.Error("Same channel handed back for one and two") } handler := NewServiceHandlerMock() - config.RegisterServiceHandler(handler) + config.RegisterHandler(handler) serviceUpdate1 := CreateServiceUpdate(ADD, api.Service{JSONBase: api.JSONBase{ID: "foo"}, Port: 10}) serviceUpdate2 := CreateServiceUpdate(ADD, api.Service{JSONBase: api.JSONBase{ID: "bar"}, Port: 20}) + handler.Wait(2) channelOne <- serviceUpdate1 channelTwo <- serviceUpdate2 - services := []api.Service{serviceUpdate1.Services[0], serviceUpdate2.Services[0]} + services := []api.Service{serviceUpdate2.Services[0], serviceUpdate1.Services[0]} handler.ValidateServices(t, services) } func TestNewMultipleSourcesServicesMultipleHandlersAddedAndNotified(t *testing.T) { config := NewServiceConfig() - channelOne := config.GetServiceConfigurationChannel("one") - channelTwo := config.GetServiceConfigurationChannel("two") + channelOne := config.Channel("one") + channelTwo := config.Channel("two") handler := NewServiceHandlerMock() handler2 := NewServiceHandlerMock() - config.RegisterServiceHandler(handler) - config.RegisterServiceHandler(handler2) + config.RegisterHandler(handler) + config.RegisterHandler(handler2) serviceUpdate1 := CreateServiceUpdate(ADD, api.Service{JSONBase: api.JSONBase{ID: "foo"}, Port: 10}) serviceUpdate2 := CreateServiceUpdate(ADD, api.Service{JSONBase: api.JSONBase{ID: "bar"}, Port: 20}) + handler.Wait(2) + handler2.Wait(2) channelOne <- serviceUpdate1 channelTwo <- serviceUpdate2 - services := []api.Service{serviceUpdate1.Services[0], serviceUpdate2.Services[0]} + services := []api.Service{serviceUpdate2.Services[0], serviceUpdate1.Services[0]} handler.ValidateServices(t, services) handler2.ValidateServices(t, services) } func TestNewMultipleSourcesEndpointsMultipleHandlersAddedAndNotified(t *testing.T) { - config := NewServiceConfig() - channelOne := config.GetEndpointsConfigurationChannel("one") - channelTwo := config.GetEndpointsConfigurationChannel("two") + config := NewEndpointsConfig() + channelOne := config.Channel("one") + channelTwo := config.Channel("two") handler := NewEndpointsHandlerMock() handler2 := NewEndpointsHandlerMock() - config.RegisterEndpointsHandler(handler) - config.RegisterEndpointsHandler(handler2) + config.RegisterHandler(handler) + config.RegisterHandler(handler2) endpointsUpdate1 := CreateEndpointsUpdate(ADD, api.Endpoints{Name: "foo", Endpoints: []string{"endpoint1", "endpoint2"}}) endpointsUpdate2 := CreateEndpointsUpdate(ADD, api.Endpoints{Name: "bar", Endpoints: []string{"endpoint3", "endpoint4"}}) + handler.Wait(2) + handler2.Wait(2) channelOne <- endpointsUpdate1 channelTwo <- endpointsUpdate2 - endpoints := []api.Endpoints{endpointsUpdate1.Endpoints[0], endpointsUpdate2.Endpoints[0]} + endpoints := []api.Endpoints{endpointsUpdate2.Endpoints[0], endpointsUpdate1.Endpoints[0]} handler.ValidateEndpoints(t, endpoints) handler2.ValidateEndpoints(t, endpoints) } func TestNewMultipleSourcesEndpointsMultipleHandlersAddRemoveSetAndNotified(t *testing.T) { - config := NewServiceConfig() - channelOne := config.GetEndpointsConfigurationChannel("one") - channelTwo := config.GetEndpointsConfigurationChannel("two") + config := NewEndpointsConfig() + channelOne := config.Channel("one") + channelTwo := config.Channel("two") handler := NewEndpointsHandlerMock() handler2 := NewEndpointsHandlerMock() - config.RegisterEndpointsHandler(handler) - config.RegisterEndpointsHandler(handler2) + config.RegisterHandler(handler) + config.RegisterHandler(handler2) endpointsUpdate1 := CreateEndpointsUpdate(ADD, api.Endpoints{Name: "foo", Endpoints: []string{"endpoint1", "endpoint2"}}) endpointsUpdate2 := CreateEndpointsUpdate(ADD, api.Endpoints{Name: "bar", Endpoints: []string{"endpoint3", "endpoint4"}}) + handler.Wait(2) + handler2.Wait(2) channelOne <- endpointsUpdate1 channelTwo <- endpointsUpdate2 - endpoints := []api.Endpoints{endpointsUpdate1.Endpoints[0], endpointsUpdate2.Endpoints[0]} + endpoints := []api.Endpoints{endpointsUpdate2.Endpoints[0], endpointsUpdate1.Endpoints[0]} handler.ValidateEndpoints(t, endpoints) handler2.ValidateEndpoints(t, endpoints) // Add one more endpointsUpdate3 := CreateEndpointsUpdate(ADD, api.Endpoints{Name: "foobar", Endpoints: []string{"endpoint5", "endpoint6"}}) + handler.Wait(1) + handler2.Wait(1) channelTwo <- endpointsUpdate3 - endpoints = []api.Endpoints{endpointsUpdate1.Endpoints[0], endpointsUpdate2.Endpoints[0], endpointsUpdate3.Endpoints[0]} + endpoints = []api.Endpoints{endpointsUpdate2.Endpoints[0], endpointsUpdate1.Endpoints[0], endpointsUpdate3.Endpoints[0]} handler.ValidateEndpoints(t, endpoints) handler2.ValidateEndpoints(t, endpoints) // Update the "foo" service with new endpoints endpointsUpdate1 = CreateEndpointsUpdate(ADD, api.Endpoints{Name: "foo", Endpoints: []string{"endpoint77"}}) + handler.Wait(1) + handler2.Wait(1) channelOne <- endpointsUpdate1 - endpoints = []api.Endpoints{endpointsUpdate1.Endpoints[0], endpointsUpdate2.Endpoints[0], endpointsUpdate3.Endpoints[0]} + endpoints = []api.Endpoints{endpointsUpdate2.Endpoints[0], endpointsUpdate1.Endpoints[0], endpointsUpdate3.Endpoints[0]} handler.ValidateEndpoints(t, endpoints) handler2.ValidateEndpoints(t, endpoints) // Remove "bar" service endpointsUpdate2 = CreateEndpointsUpdate(REMOVE, api.Endpoints{Name: "bar"}) + handler.Wait(1) + handler2.Wait(1) channelTwo <- endpointsUpdate2 endpoints = []api.Endpoints{endpointsUpdate1.Endpoints[0], endpointsUpdate3.Endpoints[0]} diff --git a/pkg/proxy/config/etcd.go b/pkg/proxy/config/etcd.go index 89488fca4f..93f63a165b 100644 --- a/pkg/proxy/config/etcd.go +++ b/pkg/proxy/config/etcd.go @@ -66,13 +66,13 @@ func NewConfigSourceEtcd(client *etcd.Client, serviceChannel chan ServiceUpdate, } // Run begins watching for new services and their endpoints on etcd. -func (impl ConfigSourceEtcd) Run() { +func (s ConfigSourceEtcd) Run() { // Initially, just wait for the etcd to come up before doing anything more complicated. var services []api.Service var endpoints []api.Endpoints var err error for { - services, endpoints, err = impl.getServices() + services, endpoints, err = s.GetServices() if err == nil { break } @@ -82,39 +82,39 @@ func (impl ConfigSourceEtcd) Run() { if len(services) > 0 { serviceUpdate := ServiceUpdate{Op: SET, Services: services} - impl.serviceChannel <- serviceUpdate + s.serviceChannel <- serviceUpdate } if len(endpoints) > 0 { endpointsUpdate := EndpointsUpdate{Op: SET, Endpoints: endpoints} - impl.endpointsChannel <- endpointsUpdate + s.endpointsChannel <- endpointsUpdate } // Ok, so we got something back from etcd. Let's set up a watch for new services, and // their endpoints - go impl.watchForChanges() + go s.WatchForChanges() for { - services, endpoints, err = impl.getServices() + services, endpoints, err = s.GetServices() if err != nil { glog.Errorf("ConfigSourceEtcd: Failed to get services: %v", err) } else { if len(services) > 0 { serviceUpdate := ServiceUpdate{Op: SET, Services: services} - impl.serviceChannel <- serviceUpdate + s.serviceChannel <- serviceUpdate } if len(endpoints) > 0 { endpointsUpdate := EndpointsUpdate{Op: SET, Endpoints: endpoints} - impl.endpointsChannel <- endpointsUpdate + s.endpointsChannel <- endpointsUpdate } } time.Sleep(30 * time.Second) } } -// getServices finds the list of services and their endpoints from etcd. +// GetServices finds the list of services and their endpoints from etcd. // This operation is akin to a set a known good at regular intervals. -func (impl ConfigSourceEtcd) getServices() ([]api.Service, []api.Endpoints, error) { - response, err := impl.client.Get(registryRoot+"/specs", true, false) +func (s ConfigSourceEtcd) GetServices() ([]api.Service, []api.Endpoints, error) { + response, err := s.client.Get(registryRoot+"/specs", true, false) if err != nil { glog.Errorf("Failed to get the key %s: %v", registryRoot, err) return make([]api.Service, 0), make([]api.Endpoints, 0), err @@ -133,7 +133,7 @@ func (impl ConfigSourceEtcd) getServices() ([]api.Service, []api.Endpoints, erro continue } retServices[i] = svc - endpoints, err := impl.getEndpoints(svc.ID) + endpoints, err := s.GetEndpoints(svc.ID) if err != nil { glog.Errorf("Couldn't get endpoints for %s : %v skipping", svc.ID, err) } @@ -145,10 +145,10 @@ func (impl ConfigSourceEtcd) getServices() ([]api.Service, []api.Endpoints, erro return nil, nil, fmt.Errorf("did not get the root of the registry %s", registryRoot) } -// getEndpoints finds the list of endpoints of the service from etcd. -func (impl ConfigSourceEtcd) getEndpoints(service string) (api.Endpoints, error) { +// GetEndpoints finds the list of endpoints of the service from etcd. +func (s ConfigSourceEtcd) GetEndpoints(service string) (api.Endpoints, error) { key := fmt.Sprintf(registryRoot + "/endpoints/" + service) - response, err := impl.client.Get(key, true, false) + response, err := s.client.Get(key, true, false) if err != nil { glog.Errorf("Failed to get the key: %s %v", key, err) return api.Endpoints{}, err @@ -176,23 +176,23 @@ func parseEndpoints(jsonString string) (api.Endpoints, error) { return e, err } -func (impl ConfigSourceEtcd) watchForChanges() { +func (s ConfigSourceEtcd) WatchForChanges() { glog.Info("Setting up a watch for new services") watchChannel := make(chan *etcd.Response) - go impl.client.Watch("/registry/services/", 0, true, watchChannel, nil) + go s.client.Watch("/registry/services/", 0, true, watchChannel, nil) for { watchResponse := <-watchChannel - impl.processChange(watchResponse) + s.ProcessChange(watchResponse) } } -func (impl ConfigSourceEtcd) processChange(response *etcd.Response) { +func (s ConfigSourceEtcd) ProcessChange(response *etcd.Response) { glog.Infof("Processing a change in service configuration... %s", *response) // If it's a new service being added (signified by a localport being added) // then process it as such if strings.Contains(response.Node.Key, "/endpoints/") { - impl.processEndpointResponse(response) + s.ProcessEndpointResponse(response) } else if response.Action == "set" { service, err := etcdResponseToService(response) if err != nil { @@ -202,7 +202,7 @@ func (impl ConfigSourceEtcd) processChange(response *etcd.Response) { glog.Infof("New service added/updated: %#v", service) serviceUpdate := ServiceUpdate{Op: ADD, Services: []api.Service{*service}} - impl.serviceChannel <- serviceUpdate + s.serviceChannel <- serviceUpdate return } if response.Action == "delete" { @@ -210,14 +210,14 @@ func (impl ConfigSourceEtcd) processChange(response *etcd.Response) { if len(parts) == 4 { glog.Infof("Deleting service: %s", parts[3]) serviceUpdate := ServiceUpdate{Op: REMOVE, Services: []api.Service{{JSONBase: api.JSONBase{ID: parts[3]}}}} - impl.serviceChannel <- serviceUpdate + s.serviceChannel <- serviceUpdate return } glog.Infof("Unknown service delete: %#v", parts) } } -func (impl ConfigSourceEtcd) processEndpointResponse(response *etcd.Response) { +func (s ConfigSourceEtcd) ProcessEndpointResponse(response *etcd.Response) { glog.Infof("Processing a change in endpoint configuration... %s", *response) var endpoints api.Endpoints err := json.Unmarshal([]byte(response.Node.Value), &endpoints) @@ -226,5 +226,5 @@ func (impl ConfigSourceEtcd) processEndpointResponse(response *etcd.Response) { return } endpointsUpdate := EndpointsUpdate{Op: ADD, Endpoints: []api.Endpoints{endpoints}} - impl.endpointsChannel <- endpointsUpdate + s.endpointsChannel <- endpointsUpdate } diff --git a/pkg/proxy/config/file.go b/pkg/proxy/config/file.go index ac6810bdec..bc728db455 100644 --- a/pkg/proxy/config/file.go +++ b/pkg/proxy/config/file.go @@ -70,16 +70,16 @@ func NewConfigSourceFile(filename string, serviceChannel chan ServiceUpdate, end } // Run begins watching the config file. -func (impl ConfigSourceFile) Run() { - glog.Infof("Watching file %s", impl.filename) +func (s ConfigSourceFile) Run() { + glog.Infof("Watching file %s", s.filename) var lastData []byte var lastServices []api.Service var lastEndpoints []api.Endpoints for { - data, err := ioutil.ReadFile(impl.filename) + data, err := ioutil.ReadFile(s.filename) if err != nil { - glog.Errorf("Couldn't read file: %s : %v", impl.filename, err) + glog.Errorf("Couldn't read file: %s : %v", s.filename, err) continue } @@ -103,12 +103,12 @@ func (impl ConfigSourceFile) Run() { } if !reflect.DeepEqual(lastServices, newServices) { serviceUpdate := ServiceUpdate{Op: SET, Services: newServices} - impl.serviceChannel <- serviceUpdate + s.serviceChannel <- serviceUpdate lastServices = newServices } if !reflect.DeepEqual(lastEndpoints, newEndpoints) { endpointsUpdate := EndpointsUpdate{Op: SET, Endpoints: newEndpoints} - impl.endpointsChannel <- endpointsUpdate + s.endpointsChannel <- endpointsUpdate lastEndpoints = newEndpoints }