Change proxy config to reuse util/config

Splits endpoint and service configuration into their own objects.  Also makes
the endpoint and service configuration tests correct - there was a race condition
previously that meant tests were passing but not checking correct code.
pull/6/head
Clayton Coleman 2014-07-08 12:55:11 -04:00
parent 38ec4ff8c0
commit 021cf64808
5 changed files with 315 additions and 338 deletions

View File

@ -41,27 +41,27 @@ func main() {
glog.Infof("Using configuration file %s and etcd_servers %s", *configFile, *etcdServers)
proxyConfig := config.NewServiceConfig()
serviceConfig := config.NewServiceConfig()
endpointsConfig := config.NewEndpointsConfig()
// Create a configuration source that handles configuration from etcd.
etcdClient := etcd.NewClient([]string{*etcdServers})
config.NewConfigSourceEtcd(etcdClient,
proxyConfig.GetServiceConfigurationChannel("etcd"),
proxyConfig.GetEndpointsConfigurationChannel("etcd"))
serviceConfig.Channel("etcd"),
endpointsConfig.Channel("etcd"))
// And create a configuration source that reads from a local file
config.NewConfigSourceFile(*configFile,
proxyConfig.GetServiceConfigurationChannel("file"),
proxyConfig.GetEndpointsConfigurationChannel("file"))
serviceConfig.Channel("file"),
endpointsConfig.Channel("file"))
loadBalancer := proxy.NewLoadBalancerRR()
proxier := proxy.NewProxier(loadBalancer)
// Wire proxier to handle changes to services
proxyConfig.RegisterServiceHandler(proxier)
serviceConfig.RegisterHandler(proxier)
// And wire loadBalancer to handle changes to endpoints to services
proxyConfig.RegisterEndpointsHandler(loadBalancer)
endpointsConfig.RegisterHandler(loadBalancer)
// Just loop forever for now...
select {}
}

View File

@ -18,9 +18,9 @@ package config
import (
"sync"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/config"
"github.com/golang/glog"
)
@ -69,260 +69,200 @@ type EndpointsConfigHandler interface {
OnUpdate(endpoints []api.Endpoints)
}
// ServiceConfig tracks a set of service configurations and their endpoint configurations.
// It accepts "set", "add" and "remove" operations of services and endpoints via channels, and invokes registered handlers on change.
// EndpointsConfig tracks a set of endpoints configurations.
// It accepts "set", "add" and "remove" operations of endpoints via channels, and invokes registered handlers on change.
type EndpointsConfig struct {
mux *config.Mux
watcher *config.Watcher
store *endpointsStore
}
// NewEndpointConfig creates a new EndpointConfig.
// It immediately runs the created EndpointConfig.
func NewEndpointsConfig() *EndpointsConfig {
updates := make(chan struct{})
store := &endpointsStore{updates: updates, endpoints: make(map[string]map[string]api.Endpoints)}
mux := config.NewMux(store)
watcher := config.NewWatcher()
go watchForUpdates(watcher, store, updates)
return &EndpointsConfig{mux, watcher, store}
}
func (c *EndpointsConfig) RegisterHandler(handler EndpointsConfigHandler) {
c.watcher.Add(config.ListenerFunc(func(instance interface{}) {
handler.OnUpdate(instance.([]api.Endpoints))
}))
}
func (c *EndpointsConfig) Channel(source string) chan EndpointsUpdate {
ch := c.mux.Channel(source)
endpointsCh := make(chan EndpointsUpdate)
go func() {
for update := range endpointsCh {
ch <- update
}
close(ch)
}()
return endpointsCh
}
func (c *EndpointsConfig) Config() map[string]map[string]api.Endpoints {
return c.store.MergedState().(map[string]map[string]api.Endpoints)
}
type endpointsStore struct {
endpointLock sync.RWMutex
endpoints map[string]map[string]api.Endpoints
updates chan<- struct{}
}
func (s *endpointsStore) Merge(source string, change interface{}) error {
s.endpointLock.Lock()
endpoints := s.endpoints[source]
if endpoints == nil {
endpoints = make(map[string]api.Endpoints)
}
update := change.(EndpointsUpdate)
switch update.Op {
case ADD:
glog.Infof("Adding new endpoint from source %s : %v", source, update.Endpoints)
for _, value := range update.Endpoints {
endpoints[value.Name] = value
}
case REMOVE:
glog.Infof("Removing an endpoint %v", update)
for _, value := range update.Endpoints {
delete(endpoints, value.Name)
}
case SET:
glog.Infof("Setting endpoints %v", update)
// Clear the old map entries by just creating a new map
endpoints = make(map[string]api.Endpoints)
for _, value := range update.Endpoints {
endpoints[value.Name] = value
}
default:
glog.Infof("Received invalid update type: %v", update)
}
s.endpoints[source] = endpoints
s.endpointLock.Unlock()
if s.updates != nil {
s.updates <- struct{}{}
}
return nil
}
func (s *endpointsStore) MergedState() interface{} {
s.endpointLock.RLock()
defer s.endpointLock.RUnlock()
endpoints := make([]api.Endpoints, 0)
for _, sourceEndpoints := range s.endpoints {
for _, value := range sourceEndpoints {
endpoints = append(endpoints, value)
}
}
return endpoints
}
// ServiceConfig tracks a set of service configurations.
// It accepts "set", "add" and "remove" operations of services via channels, and invokes registered handlers on change.
type ServiceConfig struct {
// Configuration sources and their lock.
configSourceLock sync.RWMutex
serviceConfigSources map[string]chan ServiceUpdate
endpointsConfigSources map[string]chan EndpointsUpdate
// Handlers for changes to services and endpoints and their lock.
handlerLock sync.RWMutex
serviceHandlers []ServiceConfigHandler
endpointHandlers []EndpointsConfigHandler
// Last known configuration for union of the sources and the locks. Map goes
// from each source to array of services/endpoints that have been configured
// through that channel.
configLock sync.RWMutex
serviceConfig map[string]map[string]api.Service
endpointConfig map[string]map[string]api.Endpoints
// Channel that service configuration source listeners use to signal of new
// configurations.
// Value written is the source of the change.
serviceNotifyChannel chan string
// Channel that endpoint configuration source listeners use to signal of new
// configurations.
// Value written is the source of the change.
endpointsNotifyChannel chan string
mux *config.Mux
watcher *config.Watcher
store *serviceStore
}
// NewServiceConfig creates a new ServiceConfig.
// It immediately runs the created ServiceConfig.
func NewServiceConfig() *ServiceConfig {
config := &ServiceConfig{
serviceConfigSources: make(map[string]chan ServiceUpdate),
endpointsConfigSources: make(map[string]chan EndpointsUpdate),
serviceHandlers: make([]ServiceConfigHandler, 10),
endpointHandlers: make([]EndpointsConfigHandler, 10),
serviceConfig: make(map[string]map[string]api.Service),
endpointConfig: make(map[string]map[string]api.Endpoints),
serviceNotifyChannel: make(chan string),
endpointsNotifyChannel: make(chan string),
}
go config.Run()
return config
updates := make(chan struct{})
store := &serviceStore{updates: updates, services: make(map[string]map[string]api.Service)}
mux := config.NewMux(store)
watcher := config.NewWatcher()
go watchForUpdates(watcher, store, updates)
return &ServiceConfig{mux, watcher, store}
}
// Run begins a loop to accept new service configurations and new endpoint configurations.
// It never returns.
func (impl *ServiceConfig) Run() {
glog.Infof("Starting the config Run loop")
for {
select {
case source := <-impl.serviceNotifyChannel:
glog.Infof("Got new service configuration from source %s", source)
impl.notifyServiceUpdate()
case source := <-impl.endpointsNotifyChannel:
glog.Infof("Got new endpoint configuration from source %s", source)
impl.notifyEndpointsUpdate()
case <-time.After(1 * time.Second):
func (c *ServiceConfig) RegisterHandler(handler ServiceConfigHandler) {
c.watcher.Add(config.ListenerFunc(func(instance interface{}) {
handler.OnUpdate(instance.([]api.Service))
}))
}
func (c *ServiceConfig) Channel(source string) chan ServiceUpdate {
ch := c.mux.Channel(source)
serviceCh := make(chan ServiceUpdate)
go func() {
for update := range serviceCh {
ch <- update
}
}
close(ch)
}()
return serviceCh
}
// serviceChannelListener begins a loop to handle incoming ServiceUpdate notifications from the channel.
// It never returns.
func (impl *ServiceConfig) serviceChannelListener(source string, listenChannel chan ServiceUpdate) {
// Represents the current services configuration for this channel.
serviceMap := make(map[string]api.Service)
for {
select {
case update := <-listenChannel:
impl.configLock.Lock()
switch update.Op {
case ADD:
glog.Infof("Adding new service from source %s : %v", source, update.Services)
for _, value := range update.Services {
serviceMap[value.ID] = value
}
case REMOVE:
glog.Infof("Removing a service %v", update)
for _, value := range update.Services {
delete(serviceMap, value.ID)
}
case SET:
glog.Infof("Setting services %v", update)
// Clear the old map entries by just creating a new map
serviceMap = make(map[string]api.Service)
for _, value := range update.Services {
serviceMap[value.ID] = value
}
default:
glog.Infof("Received invalid update type: %v", update)
continue
}
impl.serviceConfig[source] = serviceMap
impl.configLock.Unlock()
impl.serviceNotifyChannel <- source
func (c *ServiceConfig) Config() map[string]map[string]api.Service {
return c.store.MergedState().(map[string]map[string]api.Service)
}
type serviceStore struct {
serviceLock sync.RWMutex
services map[string]map[string]api.Service
updates chan<- struct{}
}
func (s *serviceStore) Merge(source string, change interface{}) error {
s.serviceLock.Lock()
services := s.services[source]
if services == nil {
services = make(map[string]api.Service)
}
update := change.(ServiceUpdate)
switch update.Op {
case ADD:
glog.Infof("Adding new service from source %s : %v", source, update.Services)
for _, value := range update.Services {
services[value.ID] = value
}
}
}
// endpointsChannelListener begins a loop to handle incoming EndpointsUpdate notifications from the channel.
// It never returns.
func (impl *ServiceConfig) endpointsChannelListener(source string, listenChannel chan EndpointsUpdate) {
endpointMap := make(map[string]api.Endpoints)
for {
select {
case update := <-listenChannel:
impl.configLock.Lock()
switch update.Op {
case ADD:
glog.Infof("Adding a new endpoint %v", update)
for _, value := range update.Endpoints {
endpointMap[value.Name] = value
}
case REMOVE:
glog.Infof("Removing an endpoint %v", update)
for _, value := range update.Endpoints {
delete(endpointMap, value.Name)
}
case SET:
glog.Infof("Setting services %v", update)
// Clear the old map entries by just creating a new map
endpointMap = make(map[string]api.Endpoints)
for _, value := range update.Endpoints {
endpointMap[value.Name] = value
}
default:
glog.Infof("Received invalid update type: %v", update)
continue
}
impl.endpointConfig[source] = endpointMap
impl.configLock.Unlock()
impl.endpointsNotifyChannel <- source
case REMOVE:
glog.Infof("Removing a service %v", update)
for _, value := range update.Services {
delete(services, value.ID)
}
}
}
// GetServiceConfigurationChannel returns a channel where a configuration source
// can send updates of new service configurations. Multiple calls with the same
// source will return the same channel. This allows change and state based sources
// to use the same channel. Difference source names however will be treated as a
// union.
func (impl *ServiceConfig) GetServiceConfigurationChannel(source string) chan ServiceUpdate {
if len(source) == 0 {
panic("GetServiceConfigurationChannel given an empty service name")
}
impl.configSourceLock.Lock()
defer impl.configSourceLock.Unlock()
channel, exists := impl.serviceConfigSources[source]
if exists {
return channel
}
newChannel := make(chan ServiceUpdate)
impl.serviceConfigSources[source] = newChannel
go impl.serviceChannelListener(source, newChannel)
return newChannel
}
// GetEndpointsConfigurationChannel returns a channel where a configuration source
// can send updates of new endpoint configurations. Multiple calls with the same
// source will return the same channel. This allows change and state based sources
// to use the same channel. Difference source names however will be treated as a
// union.
func (impl *ServiceConfig) GetEndpointsConfigurationChannel(source string) chan EndpointsUpdate {
if len(source) == 0 {
panic("GetEndpointConfigurationChannel given an empty service name")
}
impl.configSourceLock.Lock()
defer impl.configSourceLock.Unlock()
channel, exists := impl.endpointsConfigSources[source]
if exists {
return channel
}
newChannel := make(chan EndpointsUpdate)
impl.endpointsConfigSources[source] = newChannel
go impl.endpointsChannelListener(source, newChannel)
return newChannel
}
// RegisterServiceHandler registers the ServiceConfigHandler to receive updates of changes to services.
func (impl *ServiceConfig) RegisterServiceHandler(handler ServiceConfigHandler) {
impl.handlerLock.Lock()
defer impl.handlerLock.Unlock()
for i, h := range impl.serviceHandlers {
if h == nil {
impl.serviceHandlers[i] = handler
return
case SET:
glog.Infof("Setting services %v", update)
// Clear the old map entries by just creating a new map
services = make(map[string]api.Service)
for _, value := range update.Services {
services[value.ID] = value
}
default:
glog.Infof("Received invalid update type: %v", update)
}
// TODO(vaikas): Grow the array here instead of panic.
// In practice we are expecting there to be 1 handler anyways,
// so not a big deal for now
panic("Only up to 10 service handlers supported for now")
s.services[source] = services
s.serviceLock.Unlock()
if s.updates != nil {
s.updates <- struct{}{}
}
return nil
}
// RegisterEndpointsHandler registers the EndpointsConfigHandler to receive updates of changes to services.
func (impl *ServiceConfig) RegisterEndpointsHandler(handler EndpointsConfigHandler) {
impl.handlerLock.Lock()
defer impl.handlerLock.Unlock()
for i, h := range impl.endpointHandlers {
if h == nil {
impl.endpointHandlers[i] = handler
return
}
}
// TODO(vaikas): Grow the array here instead of panic.
// In practice we are expecting there to be 1 handler anyways,
// so not a big deal for now
panic("Only up to 10 endpoint handlers supported for now")
}
// notifyServiceUpdate calls the registered ServiceConfigHandlers with the current states of services.
func (impl *ServiceConfig) notifyServiceUpdate() {
services := []api.Service{}
impl.configLock.RLock()
for _, sourceServices := range impl.serviceConfig {
func (s *serviceStore) MergedState() interface{} {
s.serviceLock.RLock()
defer s.serviceLock.RUnlock()
services := make([]api.Service, 0)
for _, sourceServices := range s.services {
for _, value := range sourceServices {
services = append(services, value)
}
}
impl.configLock.RUnlock()
glog.Infof("Unified configuration %+v", services)
impl.handlerLock.RLock()
handlers := impl.serviceHandlers
impl.handlerLock.RUnlock()
for _, handler := range handlers {
if handler != nil {
handler.OnUpdate(services)
}
}
return services
}
// notifyEndpointsUpdate calls the registered EndpointsConfigHandlers with the current states of endpoints.
func (impl *ServiceConfig) notifyEndpointsUpdate() {
endpoints := []api.Endpoints{}
impl.configLock.RLock()
for _, sourceEndpoints := range impl.endpointConfig {
for _, value := range sourceEndpoints {
endpoints = append(endpoints, value)
}
}
impl.configLock.RUnlock()
glog.Infof("Unified configuration %+v", endpoints)
impl.handlerLock.RLock()
handlers := impl.endpointHandlers
impl.handlerLock.RUnlock()
for _, handler := range handlers {
if handler != nil {
handler.OnUpdate(endpoints)
}
// watchForUpdates invokes watcher.Notify() with the latest version of an object
// when changes occur.
func watchForUpdates(watcher *config.Watcher, accessor config.Accessor, updates <-chan struct{}) {
for _ = range updates {
watcher.Notify(accessor.MergedState())
}
}

View File

@ -14,13 +14,16 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package config
package config_test
import (
"reflect"
"sort"
"sync"
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
. "github.com/GoogleCloudPlatform/kubernetes/pkg/proxy/config"
)
const TomcatPort int = 8080
@ -33,42 +36,82 @@ const MysqlName = "mysql"
var MysqlEndpoints = map[string]string{"c0": "1.1.1.1:13306", "c3": "2.2.2.2:13306"}
type sortedServices []api.Service
func (s sortedServices) Len() int {
return len(s)
}
func (s sortedServices) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
func (s sortedServices) Less(i, j int) bool {
return s[i].JSONBase.ID < s[j].JSONBase.ID
}
type ServiceHandlerMock struct {
services []api.Service
updated sync.WaitGroup
}
func NewServiceHandlerMock() ServiceHandlerMock {
return ServiceHandlerMock{services: make([]api.Service, 0)}
func NewServiceHandlerMock() *ServiceHandlerMock {
return &ServiceHandlerMock{services: make([]api.Service, 0)}
}
func (impl ServiceHandlerMock) OnUpdate(services []api.Service) {
impl.services = services
func (h *ServiceHandlerMock) OnUpdate(services []api.Service) {
sort.Sort(sortedServices(services))
h.services = services
h.updated.Done()
}
func (impl ServiceHandlerMock) ValidateServices(t *testing.T, expectedServices []api.Service) {
if reflect.DeepEqual(impl.services, expectedServices) {
t.Errorf("Services don't match %+v expected: %+v", impl.services, expectedServices)
func (h *ServiceHandlerMock) ValidateServices(t *testing.T, expectedServices []api.Service) {
h.updated.Wait()
if !reflect.DeepEqual(h.services, expectedServices) {
t.Errorf("Expected %#v, Got %#v", expectedServices, h.services)
}
}
func (h *ServiceHandlerMock) Wait(waits int) {
h.updated.Add(waits)
}
type sortedEndpoints []api.Endpoints
func (s sortedEndpoints) Len() int {
return len(s)
}
func (s sortedEndpoints) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
func (s sortedEndpoints) Less(i, j int) bool {
return s[i].Name < s[j].Name
}
type EndpointsHandlerMock struct {
endpoints []api.Endpoints
updated sync.WaitGroup
}
func NewEndpointsHandlerMock() EndpointsHandlerMock {
return EndpointsHandlerMock{endpoints: make([]api.Endpoints, 0)}
func NewEndpointsHandlerMock() *EndpointsHandlerMock {
return &EndpointsHandlerMock{endpoints: make([]api.Endpoints, 0)}
}
func (impl EndpointsHandlerMock) OnUpdate(endpoints []api.Endpoints) {
impl.endpoints = endpoints
func (h *EndpointsHandlerMock) OnUpdate(endpoints []api.Endpoints) {
sort.Sort(sortedEndpoints(endpoints))
h.endpoints = endpoints
h.updated.Done()
}
func (impl EndpointsHandlerMock) ValidateEndpoints(t *testing.T, expectedEndpoints []api.Endpoints) {
if reflect.DeepEqual(impl.endpoints, expectedEndpoints) {
t.Errorf("Endpoints don't match %+v", impl.endpoints, expectedEndpoints)
func (h *EndpointsHandlerMock) ValidateEndpoints(t *testing.T, expectedEndpoints []api.Endpoints) {
h.updated.Wait()
if !reflect.DeepEqual(h.endpoints, expectedEndpoints) {
t.Errorf("Expected %#v, Got %#v", expectedEndpoints, h.endpoints)
}
}
func (h *EndpointsHandlerMock) Wait(waits int) {
h.updated.Add(waits)
}
func CreateServiceUpdate(op Operation, services ...api.Service) ServiceUpdate {
ret := ServiceUpdate{Op: op}
ret.Services = make([]api.Service, len(services))
@ -87,35 +130,12 @@ func CreateEndpointsUpdate(op Operation, endpoints ...api.Endpoints) EndpointsUp
return ret
}
func TestServiceConfigurationChannels(t *testing.T) {
config := NewServiceConfig()
channelOne := config.GetServiceConfigurationChannel("one")
if channelOne != config.GetServiceConfigurationChannel("one") {
t.Error("Didn't get the same service configuration channel back with the same name")
}
channelTwo := config.GetServiceConfigurationChannel("two")
if channelOne == channelTwo {
t.Error("Got back the same service configuration channel for different names")
}
}
func TestEndpointConfigurationChannels(t *testing.T) {
config := NewServiceConfig()
channelOne := config.GetEndpointsConfigurationChannel("one")
if channelOne != config.GetEndpointsConfigurationChannel("one") {
t.Error("Didn't get the same endpoint configuration channel back with the same name")
}
channelTwo := config.GetEndpointsConfigurationChannel("two")
if channelOne == channelTwo {
t.Error("Got back the same endpoint configuration channel for different names")
}
}
func TestNewServiceAddedAndNotified(t *testing.T) {
config := NewServiceConfig()
channel := config.GetServiceConfigurationChannel("one")
channel := config.Channel("one")
handler := NewServiceHandlerMock()
config.RegisterServiceHandler(&handler)
handler.Wait(1)
config.RegisterHandler(handler)
serviceUpdate := CreateServiceUpdate(ADD, api.Service{JSONBase: api.JSONBase{ID: "foo"}, Port: 10})
channel <- serviceUpdate
handler.ValidateServices(t, serviceUpdate.Services)
@ -124,24 +144,28 @@ func TestNewServiceAddedAndNotified(t *testing.T) {
func TestServiceAddedRemovedSetAndNotified(t *testing.T) {
config := NewServiceConfig()
channel := config.GetServiceConfigurationChannel("one")
channel := config.Channel("one")
handler := NewServiceHandlerMock()
config.RegisterServiceHandler(&handler)
config.RegisterHandler(handler)
serviceUpdate := CreateServiceUpdate(ADD, api.Service{JSONBase: api.JSONBase{ID: "foo"}, Port: 10})
handler.Wait(1)
channel <- serviceUpdate
handler.ValidateServices(t, serviceUpdate.Services)
serviceUpdate2 := CreateServiceUpdate(ADD, api.Service{JSONBase: api.JSONBase{ID: "bar"}, Port: 20})
handler.Wait(1)
channel <- serviceUpdate2
services := []api.Service{serviceUpdate.Services[0], serviceUpdate2.Services[0]}
services := []api.Service{serviceUpdate2.Services[0], serviceUpdate.Services[0]}
handler.ValidateServices(t, services)
serviceUpdate3 := CreateServiceUpdate(REMOVE, api.Service{JSONBase: api.JSONBase{ID: "foo"}})
handler.Wait(1)
channel <- serviceUpdate3
services = []api.Service{serviceUpdate2.Services[0]}
handler.ValidateServices(t, services)
serviceUpdate4 := CreateServiceUpdate(SET, api.Service{JSONBase: api.JSONBase{ID: "foobar"}, Port: 99})
handler.Wait(1)
channel <- serviceUpdate4
services = []api.Service{serviceUpdate4.Services[0]}
handler.ValidateServices(t, services)
@ -149,89 +173,102 @@ func TestServiceAddedRemovedSetAndNotified(t *testing.T) {
func TestNewMultipleSourcesServicesAddedAndNotified(t *testing.T) {
config := NewServiceConfig()
channelOne := config.GetServiceConfigurationChannel("one")
channelTwo := config.GetServiceConfigurationChannel("two")
channelOne := config.Channel("one")
channelTwo := config.Channel("two")
if channelOne == channelTwo {
t.Error("Same channel handed back for one and two")
}
handler := NewServiceHandlerMock()
config.RegisterServiceHandler(handler)
config.RegisterHandler(handler)
serviceUpdate1 := CreateServiceUpdate(ADD, api.Service{JSONBase: api.JSONBase{ID: "foo"}, Port: 10})
serviceUpdate2 := CreateServiceUpdate(ADD, api.Service{JSONBase: api.JSONBase{ID: "bar"}, Port: 20})
handler.Wait(2)
channelOne <- serviceUpdate1
channelTwo <- serviceUpdate2
services := []api.Service{serviceUpdate1.Services[0], serviceUpdate2.Services[0]}
services := []api.Service{serviceUpdate2.Services[0], serviceUpdate1.Services[0]}
handler.ValidateServices(t, services)
}
func TestNewMultipleSourcesServicesMultipleHandlersAddedAndNotified(t *testing.T) {
config := NewServiceConfig()
channelOne := config.GetServiceConfigurationChannel("one")
channelTwo := config.GetServiceConfigurationChannel("two")
channelOne := config.Channel("one")
channelTwo := config.Channel("two")
handler := NewServiceHandlerMock()
handler2 := NewServiceHandlerMock()
config.RegisterServiceHandler(handler)
config.RegisterServiceHandler(handler2)
config.RegisterHandler(handler)
config.RegisterHandler(handler2)
serviceUpdate1 := CreateServiceUpdate(ADD, api.Service{JSONBase: api.JSONBase{ID: "foo"}, Port: 10})
serviceUpdate2 := CreateServiceUpdate(ADD, api.Service{JSONBase: api.JSONBase{ID: "bar"}, Port: 20})
handler.Wait(2)
handler2.Wait(2)
channelOne <- serviceUpdate1
channelTwo <- serviceUpdate2
services := []api.Service{serviceUpdate1.Services[0], serviceUpdate2.Services[0]}
services := []api.Service{serviceUpdate2.Services[0], serviceUpdate1.Services[0]}
handler.ValidateServices(t, services)
handler2.ValidateServices(t, services)
}
func TestNewMultipleSourcesEndpointsMultipleHandlersAddedAndNotified(t *testing.T) {
config := NewServiceConfig()
channelOne := config.GetEndpointsConfigurationChannel("one")
channelTwo := config.GetEndpointsConfigurationChannel("two")
config := NewEndpointsConfig()
channelOne := config.Channel("one")
channelTwo := config.Channel("two")
handler := NewEndpointsHandlerMock()
handler2 := NewEndpointsHandlerMock()
config.RegisterEndpointsHandler(handler)
config.RegisterEndpointsHandler(handler2)
config.RegisterHandler(handler)
config.RegisterHandler(handler2)
endpointsUpdate1 := CreateEndpointsUpdate(ADD, api.Endpoints{Name: "foo", Endpoints: []string{"endpoint1", "endpoint2"}})
endpointsUpdate2 := CreateEndpointsUpdate(ADD, api.Endpoints{Name: "bar", Endpoints: []string{"endpoint3", "endpoint4"}})
handler.Wait(2)
handler2.Wait(2)
channelOne <- endpointsUpdate1
channelTwo <- endpointsUpdate2
endpoints := []api.Endpoints{endpointsUpdate1.Endpoints[0], endpointsUpdate2.Endpoints[0]}
endpoints := []api.Endpoints{endpointsUpdate2.Endpoints[0], endpointsUpdate1.Endpoints[0]}
handler.ValidateEndpoints(t, endpoints)
handler2.ValidateEndpoints(t, endpoints)
}
func TestNewMultipleSourcesEndpointsMultipleHandlersAddRemoveSetAndNotified(t *testing.T) {
config := NewServiceConfig()
channelOne := config.GetEndpointsConfigurationChannel("one")
channelTwo := config.GetEndpointsConfigurationChannel("two")
config := NewEndpointsConfig()
channelOne := config.Channel("one")
channelTwo := config.Channel("two")
handler := NewEndpointsHandlerMock()
handler2 := NewEndpointsHandlerMock()
config.RegisterEndpointsHandler(handler)
config.RegisterEndpointsHandler(handler2)
config.RegisterHandler(handler)
config.RegisterHandler(handler2)
endpointsUpdate1 := CreateEndpointsUpdate(ADD, api.Endpoints{Name: "foo", Endpoints: []string{"endpoint1", "endpoint2"}})
endpointsUpdate2 := CreateEndpointsUpdate(ADD, api.Endpoints{Name: "bar", Endpoints: []string{"endpoint3", "endpoint4"}})
handler.Wait(2)
handler2.Wait(2)
channelOne <- endpointsUpdate1
channelTwo <- endpointsUpdate2
endpoints := []api.Endpoints{endpointsUpdate1.Endpoints[0], endpointsUpdate2.Endpoints[0]}
endpoints := []api.Endpoints{endpointsUpdate2.Endpoints[0], endpointsUpdate1.Endpoints[0]}
handler.ValidateEndpoints(t, endpoints)
handler2.ValidateEndpoints(t, endpoints)
// Add one more
endpointsUpdate3 := CreateEndpointsUpdate(ADD, api.Endpoints{Name: "foobar", Endpoints: []string{"endpoint5", "endpoint6"}})
handler.Wait(1)
handler2.Wait(1)
channelTwo <- endpointsUpdate3
endpoints = []api.Endpoints{endpointsUpdate1.Endpoints[0], endpointsUpdate2.Endpoints[0], endpointsUpdate3.Endpoints[0]}
endpoints = []api.Endpoints{endpointsUpdate2.Endpoints[0], endpointsUpdate1.Endpoints[0], endpointsUpdate3.Endpoints[0]}
handler.ValidateEndpoints(t, endpoints)
handler2.ValidateEndpoints(t, endpoints)
// Update the "foo" service with new endpoints
endpointsUpdate1 = CreateEndpointsUpdate(ADD, api.Endpoints{Name: "foo", Endpoints: []string{"endpoint77"}})
handler.Wait(1)
handler2.Wait(1)
channelOne <- endpointsUpdate1
endpoints = []api.Endpoints{endpointsUpdate1.Endpoints[0], endpointsUpdate2.Endpoints[0], endpointsUpdate3.Endpoints[0]}
endpoints = []api.Endpoints{endpointsUpdate2.Endpoints[0], endpointsUpdate1.Endpoints[0], endpointsUpdate3.Endpoints[0]}
handler.ValidateEndpoints(t, endpoints)
handler2.ValidateEndpoints(t, endpoints)
// Remove "bar" service
endpointsUpdate2 = CreateEndpointsUpdate(REMOVE, api.Endpoints{Name: "bar"})
handler.Wait(1)
handler2.Wait(1)
channelTwo <- endpointsUpdate2
endpoints = []api.Endpoints{endpointsUpdate1.Endpoints[0], endpointsUpdate3.Endpoints[0]}

View File

@ -66,13 +66,13 @@ func NewConfigSourceEtcd(client *etcd.Client, serviceChannel chan ServiceUpdate,
}
// Run begins watching for new services and their endpoints on etcd.
func (impl ConfigSourceEtcd) Run() {
func (s ConfigSourceEtcd) Run() {
// Initially, just wait for the etcd to come up before doing anything more complicated.
var services []api.Service
var endpoints []api.Endpoints
var err error
for {
services, endpoints, err = impl.getServices()
services, endpoints, err = s.GetServices()
if err == nil {
break
}
@ -82,39 +82,39 @@ func (impl ConfigSourceEtcd) Run() {
if len(services) > 0 {
serviceUpdate := ServiceUpdate{Op: SET, Services: services}
impl.serviceChannel <- serviceUpdate
s.serviceChannel <- serviceUpdate
}
if len(endpoints) > 0 {
endpointsUpdate := EndpointsUpdate{Op: SET, Endpoints: endpoints}
impl.endpointsChannel <- endpointsUpdate
s.endpointsChannel <- endpointsUpdate
}
// Ok, so we got something back from etcd. Let's set up a watch for new services, and
// their endpoints
go impl.watchForChanges()
go s.WatchForChanges()
for {
services, endpoints, err = impl.getServices()
services, endpoints, err = s.GetServices()
if err != nil {
glog.Errorf("ConfigSourceEtcd: Failed to get services: %v", err)
} else {
if len(services) > 0 {
serviceUpdate := ServiceUpdate{Op: SET, Services: services}
impl.serviceChannel <- serviceUpdate
s.serviceChannel <- serviceUpdate
}
if len(endpoints) > 0 {
endpointsUpdate := EndpointsUpdate{Op: SET, Endpoints: endpoints}
impl.endpointsChannel <- endpointsUpdate
s.endpointsChannel <- endpointsUpdate
}
}
time.Sleep(30 * time.Second)
}
}
// getServices finds the list of services and their endpoints from etcd.
// GetServices finds the list of services and their endpoints from etcd.
// This operation is akin to a set a known good at regular intervals.
func (impl ConfigSourceEtcd) getServices() ([]api.Service, []api.Endpoints, error) {
response, err := impl.client.Get(registryRoot+"/specs", true, false)
func (s ConfigSourceEtcd) GetServices() ([]api.Service, []api.Endpoints, error) {
response, err := s.client.Get(registryRoot+"/specs", true, false)
if err != nil {
glog.Errorf("Failed to get the key %s: %v", registryRoot, err)
return make([]api.Service, 0), make([]api.Endpoints, 0), err
@ -133,7 +133,7 @@ func (impl ConfigSourceEtcd) getServices() ([]api.Service, []api.Endpoints, erro
continue
}
retServices[i] = svc
endpoints, err := impl.getEndpoints(svc.ID)
endpoints, err := s.GetEndpoints(svc.ID)
if err != nil {
glog.Errorf("Couldn't get endpoints for %s : %v skipping", svc.ID, err)
}
@ -145,10 +145,10 @@ func (impl ConfigSourceEtcd) getServices() ([]api.Service, []api.Endpoints, erro
return nil, nil, fmt.Errorf("did not get the root of the registry %s", registryRoot)
}
// getEndpoints finds the list of endpoints of the service from etcd.
func (impl ConfigSourceEtcd) getEndpoints(service string) (api.Endpoints, error) {
// GetEndpoints finds the list of endpoints of the service from etcd.
func (s ConfigSourceEtcd) GetEndpoints(service string) (api.Endpoints, error) {
key := fmt.Sprintf(registryRoot + "/endpoints/" + service)
response, err := impl.client.Get(key, true, false)
response, err := s.client.Get(key, true, false)
if err != nil {
glog.Errorf("Failed to get the key: %s %v", key, err)
return api.Endpoints{}, err
@ -176,23 +176,23 @@ func parseEndpoints(jsonString string) (api.Endpoints, error) {
return e, err
}
func (impl ConfigSourceEtcd) watchForChanges() {
func (s ConfigSourceEtcd) WatchForChanges() {
glog.Info("Setting up a watch for new services")
watchChannel := make(chan *etcd.Response)
go impl.client.Watch("/registry/services/", 0, true, watchChannel, nil)
go s.client.Watch("/registry/services/", 0, true, watchChannel, nil)
for {
watchResponse := <-watchChannel
impl.processChange(watchResponse)
s.ProcessChange(watchResponse)
}
}
func (impl ConfigSourceEtcd) processChange(response *etcd.Response) {
func (s ConfigSourceEtcd) ProcessChange(response *etcd.Response) {
glog.Infof("Processing a change in service configuration... %s", *response)
// If it's a new service being added (signified by a localport being added)
// then process it as such
if strings.Contains(response.Node.Key, "/endpoints/") {
impl.processEndpointResponse(response)
s.ProcessEndpointResponse(response)
} else if response.Action == "set" {
service, err := etcdResponseToService(response)
if err != nil {
@ -202,7 +202,7 @@ func (impl ConfigSourceEtcd) processChange(response *etcd.Response) {
glog.Infof("New service added/updated: %#v", service)
serviceUpdate := ServiceUpdate{Op: ADD, Services: []api.Service{*service}}
impl.serviceChannel <- serviceUpdate
s.serviceChannel <- serviceUpdate
return
}
if response.Action == "delete" {
@ -210,14 +210,14 @@ func (impl ConfigSourceEtcd) processChange(response *etcd.Response) {
if len(parts) == 4 {
glog.Infof("Deleting service: %s", parts[3])
serviceUpdate := ServiceUpdate{Op: REMOVE, Services: []api.Service{{JSONBase: api.JSONBase{ID: parts[3]}}}}
impl.serviceChannel <- serviceUpdate
s.serviceChannel <- serviceUpdate
return
}
glog.Infof("Unknown service delete: %#v", parts)
}
}
func (impl ConfigSourceEtcd) processEndpointResponse(response *etcd.Response) {
func (s ConfigSourceEtcd) ProcessEndpointResponse(response *etcd.Response) {
glog.Infof("Processing a change in endpoint configuration... %s", *response)
var endpoints api.Endpoints
err := json.Unmarshal([]byte(response.Node.Value), &endpoints)
@ -226,5 +226,5 @@ func (impl ConfigSourceEtcd) processEndpointResponse(response *etcd.Response) {
return
}
endpointsUpdate := EndpointsUpdate{Op: ADD, Endpoints: []api.Endpoints{endpoints}}
impl.endpointsChannel <- endpointsUpdate
s.endpointsChannel <- endpointsUpdate
}

View File

@ -70,16 +70,16 @@ func NewConfigSourceFile(filename string, serviceChannel chan ServiceUpdate, end
}
// Run begins watching the config file.
func (impl ConfigSourceFile) Run() {
glog.Infof("Watching file %s", impl.filename)
func (s ConfigSourceFile) Run() {
glog.Infof("Watching file %s", s.filename)
var lastData []byte
var lastServices []api.Service
var lastEndpoints []api.Endpoints
for {
data, err := ioutil.ReadFile(impl.filename)
data, err := ioutil.ReadFile(s.filename)
if err != nil {
glog.Errorf("Couldn't read file: %s : %v", impl.filename, err)
glog.Errorf("Couldn't read file: %s : %v", s.filename, err)
continue
}
@ -103,12 +103,12 @@ func (impl ConfigSourceFile) Run() {
}
if !reflect.DeepEqual(lastServices, newServices) {
serviceUpdate := ServiceUpdate{Op: SET, Services: newServices}
impl.serviceChannel <- serviceUpdate
s.serviceChannel <- serviceUpdate
lastServices = newServices
}
if !reflect.DeepEqual(lastEndpoints, newEndpoints) {
endpointsUpdate := EndpointsUpdate{Op: SET, Endpoints: newEndpoints}
impl.endpointsChannel <- endpointsUpdate
s.endpointsChannel <- endpointsUpdate
lastEndpoints = newEndpoints
}