Merge pull request #465 from smarterclayton/simplify_config_for_reuse

Extract proxy/config common functions for reuse
pull/6/head
Clayton Coleman 2014-07-16 18:28:07 -04:00
commit 17053f5950
8 changed files with 595 additions and 338 deletions

View File

@ -41,27 +41,27 @@ func main() {
glog.Infof("Using configuration file %s and etcd_servers %s", *configFile, *etcdServers)
proxyConfig := config.NewServiceConfig()
serviceConfig := config.NewServiceConfig()
endpointsConfig := config.NewEndpointsConfig()
// Create a configuration source that handles configuration from etcd.
etcdClient := etcd.NewClient([]string{*etcdServers})
config.NewConfigSourceEtcd(etcdClient,
proxyConfig.GetServiceConfigurationChannel("etcd"),
proxyConfig.GetEndpointsConfigurationChannel("etcd"))
serviceConfig.Channel("etcd"),
endpointsConfig.Channel("etcd"))
// And create a configuration source that reads from a local file
config.NewConfigSourceFile(*configFile,
proxyConfig.GetServiceConfigurationChannel("file"),
proxyConfig.GetEndpointsConfigurationChannel("file"))
serviceConfig.Channel("file"),
endpointsConfig.Channel("file"))
loadBalancer := proxy.NewLoadBalancerRR()
proxier := proxy.NewProxier(loadBalancer)
// Wire proxier to handle changes to services
proxyConfig.RegisterServiceHandler(proxier)
serviceConfig.RegisterHandler(proxier)
// And wire loadBalancer to handle changes to endpoints to services
proxyConfig.RegisterEndpointsHandler(loadBalancer)
endpointsConfig.RegisterHandler(loadBalancer)
// Just loop forever for now...
select {}
}

View File

@ -18,9 +18,9 @@ package config
import (
"sync"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/config"
"github.com/golang/glog"
)
@ -69,260 +69,200 @@ type EndpointsConfigHandler interface {
OnUpdate(endpoints []api.Endpoints)
}
// ServiceConfig tracks a set of service configurations and their endpoint configurations.
// It accepts "set", "add" and "remove" operations of services and endpoints via channels, and invokes registered handlers on change.
// EndpointsConfig tracks a set of endpoints configurations.
// It accepts "set", "add" and "remove" operations of endpoints via channels, and invokes registered handlers on change.
type EndpointsConfig struct {
mux *config.Mux
watcher *config.Watcher
store *endpointsStore
}
// NewEndpointConfig creates a new EndpointConfig.
// It immediately runs the created EndpointConfig.
func NewEndpointsConfig() *EndpointsConfig {
updates := make(chan struct{})
store := &endpointsStore{updates: updates, endpoints: make(map[string]map[string]api.Endpoints)}
mux := config.NewMux(store)
watcher := config.NewWatcher()
go watchForUpdates(watcher, store, updates)
return &EndpointsConfig{mux, watcher, store}
}
func (c *EndpointsConfig) RegisterHandler(handler EndpointsConfigHandler) {
c.watcher.Add(config.ListenerFunc(func(instance interface{}) {
handler.OnUpdate(instance.([]api.Endpoints))
}))
}
func (c *EndpointsConfig) Channel(source string) chan EndpointsUpdate {
ch := c.mux.Channel(source)
endpointsCh := make(chan EndpointsUpdate)
go func() {
for update := range endpointsCh {
ch <- update
}
close(ch)
}()
return endpointsCh
}
func (c *EndpointsConfig) Config() map[string]map[string]api.Endpoints {
return c.store.MergedState().(map[string]map[string]api.Endpoints)
}
type endpointsStore struct {
endpointLock sync.RWMutex
endpoints map[string]map[string]api.Endpoints
updates chan<- struct{}
}
func (s *endpointsStore) Merge(source string, change interface{}) error {
s.endpointLock.Lock()
endpoints := s.endpoints[source]
if endpoints == nil {
endpoints = make(map[string]api.Endpoints)
}
update := change.(EndpointsUpdate)
switch update.Op {
case ADD:
glog.Infof("Adding new endpoint from source %s : %v", source, update.Endpoints)
for _, value := range update.Endpoints {
endpoints[value.Name] = value
}
case REMOVE:
glog.Infof("Removing an endpoint %v", update)
for _, value := range update.Endpoints {
delete(endpoints, value.Name)
}
case SET:
glog.Infof("Setting endpoints %v", update)
// Clear the old map entries by just creating a new map
endpoints = make(map[string]api.Endpoints)
for _, value := range update.Endpoints {
endpoints[value.Name] = value
}
default:
glog.Infof("Received invalid update type: %v", update)
}
s.endpoints[source] = endpoints
s.endpointLock.Unlock()
if s.updates != nil {
s.updates <- struct{}{}
}
return nil
}
func (s *endpointsStore) MergedState() interface{} {
s.endpointLock.RLock()
defer s.endpointLock.RUnlock()
endpoints := make([]api.Endpoints, 0)
for _, sourceEndpoints := range s.endpoints {
for _, value := range sourceEndpoints {
endpoints = append(endpoints, value)
}
}
return endpoints
}
// ServiceConfig tracks a set of service configurations.
// It accepts "set", "add" and "remove" operations of services via channels, and invokes registered handlers on change.
type ServiceConfig struct {
// Configuration sources and their lock.
configSourceLock sync.RWMutex
serviceConfigSources map[string]chan ServiceUpdate
endpointsConfigSources map[string]chan EndpointsUpdate
// Handlers for changes to services and endpoints and their lock.
handlerLock sync.RWMutex
serviceHandlers []ServiceConfigHandler
endpointHandlers []EndpointsConfigHandler
// Last known configuration for union of the sources and the locks. Map goes
// from each source to array of services/endpoints that have been configured
// through that channel.
configLock sync.RWMutex
serviceConfig map[string]map[string]api.Service
endpointConfig map[string]map[string]api.Endpoints
// Channel that service configuration source listeners use to signal of new
// configurations.
// Value written is the source of the change.
serviceNotifyChannel chan string
// Channel that endpoint configuration source listeners use to signal of new
// configurations.
// Value written is the source of the change.
endpointsNotifyChannel chan string
mux *config.Mux
watcher *config.Watcher
store *serviceStore
}
// NewServiceConfig creates a new ServiceConfig.
// It immediately runs the created ServiceConfig.
func NewServiceConfig() *ServiceConfig {
config := &ServiceConfig{
serviceConfigSources: make(map[string]chan ServiceUpdate),
endpointsConfigSources: make(map[string]chan EndpointsUpdate),
serviceHandlers: make([]ServiceConfigHandler, 10),
endpointHandlers: make([]EndpointsConfigHandler, 10),
serviceConfig: make(map[string]map[string]api.Service),
endpointConfig: make(map[string]map[string]api.Endpoints),
serviceNotifyChannel: make(chan string),
endpointsNotifyChannel: make(chan string),
}
go config.Run()
return config
updates := make(chan struct{})
store := &serviceStore{updates: updates, services: make(map[string]map[string]api.Service)}
mux := config.NewMux(store)
watcher := config.NewWatcher()
go watchForUpdates(watcher, store, updates)
return &ServiceConfig{mux, watcher, store}
}
// Run begins a loop to accept new service configurations and new endpoint configurations.
// It never returns.
func (impl *ServiceConfig) Run() {
glog.Infof("Starting the config Run loop")
for {
select {
case source := <-impl.serviceNotifyChannel:
glog.Infof("Got new service configuration from source %s", source)
impl.notifyServiceUpdate()
case source := <-impl.endpointsNotifyChannel:
glog.Infof("Got new endpoint configuration from source %s", source)
impl.notifyEndpointsUpdate()
case <-time.After(1 * time.Second):
}
}
func (c *ServiceConfig) RegisterHandler(handler ServiceConfigHandler) {
c.watcher.Add(config.ListenerFunc(func(instance interface{}) {
handler.OnUpdate(instance.([]api.Service))
}))
}
// serviceChannelListener begins a loop to handle incoming ServiceUpdate notifications from the channel.
// It never returns.
func (impl *ServiceConfig) serviceChannelListener(source string, listenChannel chan ServiceUpdate) {
// Represents the current services configuration for this channel.
serviceMap := make(map[string]api.Service)
for {
select {
case update := <-listenChannel:
impl.configLock.Lock()
func (c *ServiceConfig) Channel(source string) chan ServiceUpdate {
ch := c.mux.Channel(source)
serviceCh := make(chan ServiceUpdate)
go func() {
for update := range serviceCh {
ch <- update
}
close(ch)
}()
return serviceCh
}
func (c *ServiceConfig) Config() map[string]map[string]api.Service {
return c.store.MergedState().(map[string]map[string]api.Service)
}
type serviceStore struct {
serviceLock sync.RWMutex
services map[string]map[string]api.Service
updates chan<- struct{}
}
func (s *serviceStore) Merge(source string, change interface{}) error {
s.serviceLock.Lock()
services := s.services[source]
if services == nil {
services = make(map[string]api.Service)
}
update := change.(ServiceUpdate)
switch update.Op {
case ADD:
glog.Infof("Adding new service from source %s : %v", source, update.Services)
for _, value := range update.Services {
serviceMap[value.ID] = value
services[value.ID] = value
}
case REMOVE:
glog.Infof("Removing a service %v", update)
for _, value := range update.Services {
delete(serviceMap, value.ID)
delete(services, value.ID)
}
case SET:
glog.Infof("Setting services %v", update)
// Clear the old map entries by just creating a new map
serviceMap = make(map[string]api.Service)
services = make(map[string]api.Service)
for _, value := range update.Services {
serviceMap[value.ID] = value
services[value.ID] = value
}
default:
glog.Infof("Received invalid update type: %v", update)
continue
}
impl.serviceConfig[source] = serviceMap
impl.configLock.Unlock()
impl.serviceNotifyChannel <- source
}
s.services[source] = services
s.serviceLock.Unlock()
if s.updates != nil {
s.updates <- struct{}{}
}
return nil
}
// endpointsChannelListener begins a loop to handle incoming EndpointsUpdate notifications from the channel.
// It never returns.
func (impl *ServiceConfig) endpointsChannelListener(source string, listenChannel chan EndpointsUpdate) {
endpointMap := make(map[string]api.Endpoints)
for {
select {
case update := <-listenChannel:
impl.configLock.Lock()
switch update.Op {
case ADD:
glog.Infof("Adding a new endpoint %v", update)
for _, value := range update.Endpoints {
endpointMap[value.Name] = value
}
case REMOVE:
glog.Infof("Removing an endpoint %v", update)
for _, value := range update.Endpoints {
delete(endpointMap, value.Name)
}
case SET:
glog.Infof("Setting services %v", update)
// Clear the old map entries by just creating a new map
endpointMap = make(map[string]api.Endpoints)
for _, value := range update.Endpoints {
endpointMap[value.Name] = value
}
default:
glog.Infof("Received invalid update type: %v", update)
continue
}
impl.endpointConfig[source] = endpointMap
impl.configLock.Unlock()
impl.endpointsNotifyChannel <- source
}
}
}
// GetServiceConfigurationChannel returns a channel where a configuration source
// can send updates of new service configurations. Multiple calls with the same
// source will return the same channel. This allows change and state based sources
// to use the same channel. Difference source names however will be treated as a
// union.
func (impl *ServiceConfig) GetServiceConfigurationChannel(source string) chan ServiceUpdate {
if len(source) == 0 {
panic("GetServiceConfigurationChannel given an empty service name")
}
impl.configSourceLock.Lock()
defer impl.configSourceLock.Unlock()
channel, exists := impl.serviceConfigSources[source]
if exists {
return channel
}
newChannel := make(chan ServiceUpdate)
impl.serviceConfigSources[source] = newChannel
go impl.serviceChannelListener(source, newChannel)
return newChannel
}
// GetEndpointsConfigurationChannel returns a channel where a configuration source
// can send updates of new endpoint configurations. Multiple calls with the same
// source will return the same channel. This allows change and state based sources
// to use the same channel. Difference source names however will be treated as a
// union.
func (impl *ServiceConfig) GetEndpointsConfigurationChannel(source string) chan EndpointsUpdate {
if len(source) == 0 {
panic("GetEndpointConfigurationChannel given an empty service name")
}
impl.configSourceLock.Lock()
defer impl.configSourceLock.Unlock()
channel, exists := impl.endpointsConfigSources[source]
if exists {
return channel
}
newChannel := make(chan EndpointsUpdate)
impl.endpointsConfigSources[source] = newChannel
go impl.endpointsChannelListener(source, newChannel)
return newChannel
}
// RegisterServiceHandler registers the ServiceConfigHandler to receive updates of changes to services.
func (impl *ServiceConfig) RegisterServiceHandler(handler ServiceConfigHandler) {
impl.handlerLock.Lock()
defer impl.handlerLock.Unlock()
for i, h := range impl.serviceHandlers {
if h == nil {
impl.serviceHandlers[i] = handler
return
}
}
// TODO(vaikas): Grow the array here instead of panic.
// In practice we are expecting there to be 1 handler anyways,
// so not a big deal for now
panic("Only up to 10 service handlers supported for now")
}
// RegisterEndpointsHandler registers the EndpointsConfigHandler to receive updates of changes to services.
func (impl *ServiceConfig) RegisterEndpointsHandler(handler EndpointsConfigHandler) {
impl.handlerLock.Lock()
defer impl.handlerLock.Unlock()
for i, h := range impl.endpointHandlers {
if h == nil {
impl.endpointHandlers[i] = handler
return
}
}
// TODO(vaikas): Grow the array here instead of panic.
// In practice we are expecting there to be 1 handler anyways,
// so not a big deal for now
panic("Only up to 10 endpoint handlers supported for now")
}
// notifyServiceUpdate calls the registered ServiceConfigHandlers with the current states of services.
func (impl *ServiceConfig) notifyServiceUpdate() {
services := []api.Service{}
impl.configLock.RLock()
for _, sourceServices := range impl.serviceConfig {
func (s *serviceStore) MergedState() interface{} {
s.serviceLock.RLock()
defer s.serviceLock.RUnlock()
services := make([]api.Service, 0)
for _, sourceServices := range s.services {
for _, value := range sourceServices {
services = append(services, value)
}
}
impl.configLock.RUnlock()
glog.Infof("Unified configuration %+v", services)
impl.handlerLock.RLock()
handlers := impl.serviceHandlers
impl.handlerLock.RUnlock()
for _, handler := range handlers {
if handler != nil {
handler.OnUpdate(services)
}
}
return services
}
// notifyEndpointsUpdate calls the registered EndpointsConfigHandlers with the current states of endpoints.
func (impl *ServiceConfig) notifyEndpointsUpdate() {
endpoints := []api.Endpoints{}
impl.configLock.RLock()
for _, sourceEndpoints := range impl.endpointConfig {
for _, value := range sourceEndpoints {
endpoints = append(endpoints, value)
}
}
impl.configLock.RUnlock()
glog.Infof("Unified configuration %+v", endpoints)
impl.handlerLock.RLock()
handlers := impl.endpointHandlers
impl.handlerLock.RUnlock()
for _, handler := range handlers {
if handler != nil {
handler.OnUpdate(endpoints)
}
// watchForUpdates invokes watcher.Notify() with the latest version of an object
// when changes occur.
func watchForUpdates(watcher *config.Watcher, accessor config.Accessor, updates <-chan struct{}) {
for _ = range updates {
watcher.Notify(accessor.MergedState())
}
}

View File

@ -14,13 +14,16 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package config
package config_test
import (
"reflect"
"sort"
"sync"
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
. "github.com/GoogleCloudPlatform/kubernetes/pkg/proxy/config"
)
const TomcatPort int = 8080
@ -33,42 +36,82 @@ const MysqlName = "mysql"
var MysqlEndpoints = map[string]string{"c0": "1.1.1.1:13306", "c3": "2.2.2.2:13306"}
type sortedServices []api.Service
func (s sortedServices) Len() int {
return len(s)
}
func (s sortedServices) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
func (s sortedServices) Less(i, j int) bool {
return s[i].JSONBase.ID < s[j].JSONBase.ID
}
type ServiceHandlerMock struct {
services []api.Service
updated sync.WaitGroup
}
func NewServiceHandlerMock() ServiceHandlerMock {
return ServiceHandlerMock{services: make([]api.Service, 0)}
func NewServiceHandlerMock() *ServiceHandlerMock {
return &ServiceHandlerMock{services: make([]api.Service, 0)}
}
func (impl ServiceHandlerMock) OnUpdate(services []api.Service) {
impl.services = services
func (h *ServiceHandlerMock) OnUpdate(services []api.Service) {
sort.Sort(sortedServices(services))
h.services = services
h.updated.Done()
}
func (impl ServiceHandlerMock) ValidateServices(t *testing.T, expectedServices []api.Service) {
if reflect.DeepEqual(impl.services, expectedServices) {
t.Errorf("Services don't match %+v expected: %+v", impl.services, expectedServices)
func (h *ServiceHandlerMock) ValidateServices(t *testing.T, expectedServices []api.Service) {
h.updated.Wait()
if !reflect.DeepEqual(h.services, expectedServices) {
t.Errorf("Expected %#v, Got %#v", expectedServices, h.services)
}
}
func (h *ServiceHandlerMock) Wait(waits int) {
h.updated.Add(waits)
}
type sortedEndpoints []api.Endpoints
func (s sortedEndpoints) Len() int {
return len(s)
}
func (s sortedEndpoints) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
func (s sortedEndpoints) Less(i, j int) bool {
return s[i].Name < s[j].Name
}
type EndpointsHandlerMock struct {
endpoints []api.Endpoints
updated sync.WaitGroup
}
func NewEndpointsHandlerMock() EndpointsHandlerMock {
return EndpointsHandlerMock{endpoints: make([]api.Endpoints, 0)}
func NewEndpointsHandlerMock() *EndpointsHandlerMock {
return &EndpointsHandlerMock{endpoints: make([]api.Endpoints, 0)}
}
func (impl EndpointsHandlerMock) OnUpdate(endpoints []api.Endpoints) {
impl.endpoints = endpoints
func (h *EndpointsHandlerMock) OnUpdate(endpoints []api.Endpoints) {
sort.Sort(sortedEndpoints(endpoints))
h.endpoints = endpoints
h.updated.Done()
}
func (impl EndpointsHandlerMock) ValidateEndpoints(t *testing.T, expectedEndpoints []api.Endpoints) {
if reflect.DeepEqual(impl.endpoints, expectedEndpoints) {
t.Errorf("Endpoints don't match %+v", impl.endpoints, expectedEndpoints)
func (h *EndpointsHandlerMock) ValidateEndpoints(t *testing.T, expectedEndpoints []api.Endpoints) {
h.updated.Wait()
if !reflect.DeepEqual(h.endpoints, expectedEndpoints) {
t.Errorf("Expected %#v, Got %#v", expectedEndpoints, h.endpoints)
}
}
func (h *EndpointsHandlerMock) Wait(waits int) {
h.updated.Add(waits)
}
func CreateServiceUpdate(op Operation, services ...api.Service) ServiceUpdate {
ret := ServiceUpdate{Op: op}
ret.Services = make([]api.Service, len(services))
@ -87,35 +130,12 @@ func CreateEndpointsUpdate(op Operation, endpoints ...api.Endpoints) EndpointsUp
return ret
}
func TestServiceConfigurationChannels(t *testing.T) {
config := NewServiceConfig()
channelOne := config.GetServiceConfigurationChannel("one")
if channelOne != config.GetServiceConfigurationChannel("one") {
t.Error("Didn't get the same service configuration channel back with the same name")
}
channelTwo := config.GetServiceConfigurationChannel("two")
if channelOne == channelTwo {
t.Error("Got back the same service configuration channel for different names")
}
}
func TestEndpointConfigurationChannels(t *testing.T) {
config := NewServiceConfig()
channelOne := config.GetEndpointsConfigurationChannel("one")
if channelOne != config.GetEndpointsConfigurationChannel("one") {
t.Error("Didn't get the same endpoint configuration channel back with the same name")
}
channelTwo := config.GetEndpointsConfigurationChannel("two")
if channelOne == channelTwo {
t.Error("Got back the same endpoint configuration channel for different names")
}
}
func TestNewServiceAddedAndNotified(t *testing.T) {
config := NewServiceConfig()
channel := config.GetServiceConfigurationChannel("one")
channel := config.Channel("one")
handler := NewServiceHandlerMock()
config.RegisterServiceHandler(&handler)
handler.Wait(1)
config.RegisterHandler(handler)
serviceUpdate := CreateServiceUpdate(ADD, api.Service{JSONBase: api.JSONBase{ID: "foo"}, Port: 10})
channel <- serviceUpdate
handler.ValidateServices(t, serviceUpdate.Services)
@ -124,24 +144,28 @@ func TestNewServiceAddedAndNotified(t *testing.T) {
func TestServiceAddedRemovedSetAndNotified(t *testing.T) {
config := NewServiceConfig()
channel := config.GetServiceConfigurationChannel("one")
channel := config.Channel("one")
handler := NewServiceHandlerMock()
config.RegisterServiceHandler(&handler)
config.RegisterHandler(handler)
serviceUpdate := CreateServiceUpdate(ADD, api.Service{JSONBase: api.JSONBase{ID: "foo"}, Port: 10})
handler.Wait(1)
channel <- serviceUpdate
handler.ValidateServices(t, serviceUpdate.Services)
serviceUpdate2 := CreateServiceUpdate(ADD, api.Service{JSONBase: api.JSONBase{ID: "bar"}, Port: 20})
handler.Wait(1)
channel <- serviceUpdate2
services := []api.Service{serviceUpdate.Services[0], serviceUpdate2.Services[0]}
services := []api.Service{serviceUpdate2.Services[0], serviceUpdate.Services[0]}
handler.ValidateServices(t, services)
serviceUpdate3 := CreateServiceUpdate(REMOVE, api.Service{JSONBase: api.JSONBase{ID: "foo"}})
handler.Wait(1)
channel <- serviceUpdate3
services = []api.Service{serviceUpdate2.Services[0]}
handler.ValidateServices(t, services)
serviceUpdate4 := CreateServiceUpdate(SET, api.Service{JSONBase: api.JSONBase{ID: "foobar"}, Port: 99})
handler.Wait(1)
channel <- serviceUpdate4
services = []api.Service{serviceUpdate4.Services[0]}
handler.ValidateServices(t, services)
@ -149,89 +173,102 @@ func TestServiceAddedRemovedSetAndNotified(t *testing.T) {
func TestNewMultipleSourcesServicesAddedAndNotified(t *testing.T) {
config := NewServiceConfig()
channelOne := config.GetServiceConfigurationChannel("one")
channelTwo := config.GetServiceConfigurationChannel("two")
channelOne := config.Channel("one")
channelTwo := config.Channel("two")
if channelOne == channelTwo {
t.Error("Same channel handed back for one and two")
}
handler := NewServiceHandlerMock()
config.RegisterServiceHandler(handler)
config.RegisterHandler(handler)
serviceUpdate1 := CreateServiceUpdate(ADD, api.Service{JSONBase: api.JSONBase{ID: "foo"}, Port: 10})
serviceUpdate2 := CreateServiceUpdate(ADD, api.Service{JSONBase: api.JSONBase{ID: "bar"}, Port: 20})
handler.Wait(2)
channelOne <- serviceUpdate1
channelTwo <- serviceUpdate2
services := []api.Service{serviceUpdate1.Services[0], serviceUpdate2.Services[0]}
services := []api.Service{serviceUpdate2.Services[0], serviceUpdate1.Services[0]}
handler.ValidateServices(t, services)
}
func TestNewMultipleSourcesServicesMultipleHandlersAddedAndNotified(t *testing.T) {
config := NewServiceConfig()
channelOne := config.GetServiceConfigurationChannel("one")
channelTwo := config.GetServiceConfigurationChannel("two")
channelOne := config.Channel("one")
channelTwo := config.Channel("two")
handler := NewServiceHandlerMock()
handler2 := NewServiceHandlerMock()
config.RegisterServiceHandler(handler)
config.RegisterServiceHandler(handler2)
config.RegisterHandler(handler)
config.RegisterHandler(handler2)
serviceUpdate1 := CreateServiceUpdate(ADD, api.Service{JSONBase: api.JSONBase{ID: "foo"}, Port: 10})
serviceUpdate2 := CreateServiceUpdate(ADD, api.Service{JSONBase: api.JSONBase{ID: "bar"}, Port: 20})
handler.Wait(2)
handler2.Wait(2)
channelOne <- serviceUpdate1
channelTwo <- serviceUpdate2
services := []api.Service{serviceUpdate1.Services[0], serviceUpdate2.Services[0]}
services := []api.Service{serviceUpdate2.Services[0], serviceUpdate1.Services[0]}
handler.ValidateServices(t, services)
handler2.ValidateServices(t, services)
}
func TestNewMultipleSourcesEndpointsMultipleHandlersAddedAndNotified(t *testing.T) {
config := NewServiceConfig()
channelOne := config.GetEndpointsConfigurationChannel("one")
channelTwo := config.GetEndpointsConfigurationChannel("two")
config := NewEndpointsConfig()
channelOne := config.Channel("one")
channelTwo := config.Channel("two")
handler := NewEndpointsHandlerMock()
handler2 := NewEndpointsHandlerMock()
config.RegisterEndpointsHandler(handler)
config.RegisterEndpointsHandler(handler2)
config.RegisterHandler(handler)
config.RegisterHandler(handler2)
endpointsUpdate1 := CreateEndpointsUpdate(ADD, api.Endpoints{Name: "foo", Endpoints: []string{"endpoint1", "endpoint2"}})
endpointsUpdate2 := CreateEndpointsUpdate(ADD, api.Endpoints{Name: "bar", Endpoints: []string{"endpoint3", "endpoint4"}})
handler.Wait(2)
handler2.Wait(2)
channelOne <- endpointsUpdate1
channelTwo <- endpointsUpdate2
endpoints := []api.Endpoints{endpointsUpdate1.Endpoints[0], endpointsUpdate2.Endpoints[0]}
endpoints := []api.Endpoints{endpointsUpdate2.Endpoints[0], endpointsUpdate1.Endpoints[0]}
handler.ValidateEndpoints(t, endpoints)
handler2.ValidateEndpoints(t, endpoints)
}
func TestNewMultipleSourcesEndpointsMultipleHandlersAddRemoveSetAndNotified(t *testing.T) {
config := NewServiceConfig()
channelOne := config.GetEndpointsConfigurationChannel("one")
channelTwo := config.GetEndpointsConfigurationChannel("two")
config := NewEndpointsConfig()
channelOne := config.Channel("one")
channelTwo := config.Channel("two")
handler := NewEndpointsHandlerMock()
handler2 := NewEndpointsHandlerMock()
config.RegisterEndpointsHandler(handler)
config.RegisterEndpointsHandler(handler2)
config.RegisterHandler(handler)
config.RegisterHandler(handler2)
endpointsUpdate1 := CreateEndpointsUpdate(ADD, api.Endpoints{Name: "foo", Endpoints: []string{"endpoint1", "endpoint2"}})
endpointsUpdate2 := CreateEndpointsUpdate(ADD, api.Endpoints{Name: "bar", Endpoints: []string{"endpoint3", "endpoint4"}})
handler.Wait(2)
handler2.Wait(2)
channelOne <- endpointsUpdate1
channelTwo <- endpointsUpdate2
endpoints := []api.Endpoints{endpointsUpdate1.Endpoints[0], endpointsUpdate2.Endpoints[0]}
endpoints := []api.Endpoints{endpointsUpdate2.Endpoints[0], endpointsUpdate1.Endpoints[0]}
handler.ValidateEndpoints(t, endpoints)
handler2.ValidateEndpoints(t, endpoints)
// Add one more
endpointsUpdate3 := CreateEndpointsUpdate(ADD, api.Endpoints{Name: "foobar", Endpoints: []string{"endpoint5", "endpoint6"}})
handler.Wait(1)
handler2.Wait(1)
channelTwo <- endpointsUpdate3
endpoints = []api.Endpoints{endpointsUpdate1.Endpoints[0], endpointsUpdate2.Endpoints[0], endpointsUpdate3.Endpoints[0]}
endpoints = []api.Endpoints{endpointsUpdate2.Endpoints[0], endpointsUpdate1.Endpoints[0], endpointsUpdate3.Endpoints[0]}
handler.ValidateEndpoints(t, endpoints)
handler2.ValidateEndpoints(t, endpoints)
// Update the "foo" service with new endpoints
endpointsUpdate1 = CreateEndpointsUpdate(ADD, api.Endpoints{Name: "foo", Endpoints: []string{"endpoint77"}})
handler.Wait(1)
handler2.Wait(1)
channelOne <- endpointsUpdate1
endpoints = []api.Endpoints{endpointsUpdate1.Endpoints[0], endpointsUpdate2.Endpoints[0], endpointsUpdate3.Endpoints[0]}
endpoints = []api.Endpoints{endpointsUpdate2.Endpoints[0], endpointsUpdate1.Endpoints[0], endpointsUpdate3.Endpoints[0]}
handler.ValidateEndpoints(t, endpoints)
handler2.ValidateEndpoints(t, endpoints)
// Remove "bar" service
endpointsUpdate2 = CreateEndpointsUpdate(REMOVE, api.Endpoints{Name: "bar"})
handler.Wait(1)
handler2.Wait(1)
channelTwo <- endpointsUpdate2
endpoints = []api.Endpoints{endpointsUpdate1.Endpoints[0], endpointsUpdate3.Endpoints[0]}

View File

@ -66,13 +66,13 @@ func NewConfigSourceEtcd(client *etcd.Client, serviceChannel chan ServiceUpdate,
}
// Run begins watching for new services and their endpoints on etcd.
func (impl ConfigSourceEtcd) Run() {
func (s ConfigSourceEtcd) Run() {
// Initially, just wait for the etcd to come up before doing anything more complicated.
var services []api.Service
var endpoints []api.Endpoints
var err error
for {
services, endpoints, err = impl.getServices()
services, endpoints, err = s.GetServices()
if err == nil {
break
}
@ -82,39 +82,39 @@ func (impl ConfigSourceEtcd) Run() {
if len(services) > 0 {
serviceUpdate := ServiceUpdate{Op: SET, Services: services}
impl.serviceChannel <- serviceUpdate
s.serviceChannel <- serviceUpdate
}
if len(endpoints) > 0 {
endpointsUpdate := EndpointsUpdate{Op: SET, Endpoints: endpoints}
impl.endpointsChannel <- endpointsUpdate
s.endpointsChannel <- endpointsUpdate
}
// Ok, so we got something back from etcd. Let's set up a watch for new services, and
// their endpoints
go impl.watchForChanges()
go s.WatchForChanges()
for {
services, endpoints, err = impl.getServices()
services, endpoints, err = s.GetServices()
if err != nil {
glog.Errorf("ConfigSourceEtcd: Failed to get services: %v", err)
} else {
if len(services) > 0 {
serviceUpdate := ServiceUpdate{Op: SET, Services: services}
impl.serviceChannel <- serviceUpdate
s.serviceChannel <- serviceUpdate
}
if len(endpoints) > 0 {
endpointsUpdate := EndpointsUpdate{Op: SET, Endpoints: endpoints}
impl.endpointsChannel <- endpointsUpdate
s.endpointsChannel <- endpointsUpdate
}
}
time.Sleep(30 * time.Second)
}
}
// getServices finds the list of services and their endpoints from etcd.
// GetServices finds the list of services and their endpoints from etcd.
// This operation is akin to a set a known good at regular intervals.
func (impl ConfigSourceEtcd) getServices() ([]api.Service, []api.Endpoints, error) {
response, err := impl.client.Get(registryRoot+"/specs", true, false)
func (s ConfigSourceEtcd) GetServices() ([]api.Service, []api.Endpoints, error) {
response, err := s.client.Get(registryRoot+"/specs", true, false)
if err != nil {
glog.Errorf("Failed to get the key %s: %v", registryRoot, err)
return make([]api.Service, 0), make([]api.Endpoints, 0), err
@ -133,7 +133,7 @@ func (impl ConfigSourceEtcd) getServices() ([]api.Service, []api.Endpoints, erro
continue
}
retServices[i] = svc
endpoints, err := impl.getEndpoints(svc.ID)
endpoints, err := s.GetEndpoints(svc.ID)
if err != nil {
glog.Errorf("Couldn't get endpoints for %s : %v skipping", svc.ID, err)
}
@ -145,10 +145,10 @@ func (impl ConfigSourceEtcd) getServices() ([]api.Service, []api.Endpoints, erro
return nil, nil, fmt.Errorf("did not get the root of the registry %s", registryRoot)
}
// getEndpoints finds the list of endpoints of the service from etcd.
func (impl ConfigSourceEtcd) getEndpoints(service string) (api.Endpoints, error) {
// GetEndpoints finds the list of endpoints of the service from etcd.
func (s ConfigSourceEtcd) GetEndpoints(service string) (api.Endpoints, error) {
key := fmt.Sprintf(registryRoot + "/endpoints/" + service)
response, err := impl.client.Get(key, true, false)
response, err := s.client.Get(key, true, false)
if err != nil {
glog.Errorf("Failed to get the key: %s %v", key, err)
return api.Endpoints{}, err
@ -176,23 +176,23 @@ func parseEndpoints(jsonString string) (api.Endpoints, error) {
return e, err
}
func (impl ConfigSourceEtcd) watchForChanges() {
func (s ConfigSourceEtcd) WatchForChanges() {
glog.Info("Setting up a watch for new services")
watchChannel := make(chan *etcd.Response)
go impl.client.Watch("/registry/services/", 0, true, watchChannel, nil)
go s.client.Watch("/registry/services/", 0, true, watchChannel, nil)
for {
watchResponse := <-watchChannel
impl.processChange(watchResponse)
s.ProcessChange(watchResponse)
}
}
func (impl ConfigSourceEtcd) processChange(response *etcd.Response) {
func (s ConfigSourceEtcd) ProcessChange(response *etcd.Response) {
glog.Infof("Processing a change in service configuration... %s", *response)
// If it's a new service being added (signified by a localport being added)
// then process it as such
if strings.Contains(response.Node.Key, "/endpoints/") {
impl.processEndpointResponse(response)
s.ProcessEndpointResponse(response)
} else if response.Action == "set" {
service, err := etcdResponseToService(response)
if err != nil {
@ -202,7 +202,7 @@ func (impl ConfigSourceEtcd) processChange(response *etcd.Response) {
glog.Infof("New service added/updated: %#v", service)
serviceUpdate := ServiceUpdate{Op: ADD, Services: []api.Service{*service}}
impl.serviceChannel <- serviceUpdate
s.serviceChannel <- serviceUpdate
return
}
if response.Action == "delete" {
@ -210,14 +210,14 @@ func (impl ConfigSourceEtcd) processChange(response *etcd.Response) {
if len(parts) == 4 {
glog.Infof("Deleting service: %s", parts[3])
serviceUpdate := ServiceUpdate{Op: REMOVE, Services: []api.Service{{JSONBase: api.JSONBase{ID: parts[3]}}}}
impl.serviceChannel <- serviceUpdate
s.serviceChannel <- serviceUpdate
return
}
glog.Infof("Unknown service delete: %#v", parts)
}
}
func (impl ConfigSourceEtcd) processEndpointResponse(response *etcd.Response) {
func (s ConfigSourceEtcd) ProcessEndpointResponse(response *etcd.Response) {
glog.Infof("Processing a change in endpoint configuration... %s", *response)
var endpoints api.Endpoints
err := json.Unmarshal([]byte(response.Node.Value), &endpoints)
@ -226,5 +226,5 @@ func (impl ConfigSourceEtcd) processEndpointResponse(response *etcd.Response) {
return
}
endpointsUpdate := EndpointsUpdate{Op: ADD, Endpoints: []api.Endpoints{endpoints}}
impl.endpointsChannel <- endpointsUpdate
s.endpointsChannel <- endpointsUpdate
}

View File

@ -70,16 +70,16 @@ func NewConfigSourceFile(filename string, serviceChannel chan ServiceUpdate, end
}
// Run begins watching the config file.
func (impl ConfigSourceFile) Run() {
glog.Infof("Watching file %s", impl.filename)
func (s ConfigSourceFile) Run() {
glog.Infof("Watching file %s", s.filename)
var lastData []byte
var lastServices []api.Service
var lastEndpoints []api.Endpoints
for {
data, err := ioutil.ReadFile(impl.filename)
data, err := ioutil.ReadFile(s.filename)
if err != nil {
glog.Errorf("Couldn't read file: %s : %v", impl.filename, err)
glog.Errorf("Couldn't read file: %s : %v", s.filename, err)
continue
}
@ -103,12 +103,12 @@ func (impl ConfigSourceFile) Run() {
}
if !reflect.DeepEqual(lastServices, newServices) {
serviceUpdate := ServiceUpdate{Op: SET, Services: newServices}
impl.serviceChannel <- serviceUpdate
s.serviceChannel <- serviceUpdate
lastServices = newServices
}
if !reflect.DeepEqual(lastEndpoints, newEndpoints) {
endpointsUpdate := EndpointsUpdate{Op: SET, Endpoints: newEndpoints}
impl.endpointsChannel <- endpointsUpdate
s.endpointsChannel <- endpointsUpdate
lastEndpoints = newEndpoints
}

140
pkg/util/config/config.go Normal file
View File

@ -0,0 +1,140 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or cied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package config
import (
"sync"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
)
type Merger interface {
// Invoked when a change from a source is received. May also function as an incremental
// merger if you wish to consume changes incrementally. Must be reentrant when more than
// one source is defined.
Merge(source string, update interface{}) error
}
// MergeFunc implements the Merger interface
type MergeFunc func(source string, update interface{}) error
func (f MergeFunc) Merge(source string, update interface{}) error {
return f(source, update)
}
// Mux is a class for merging configuration from multiple sources. Changes are
// pushed via channels and sent to the merge function.
type Mux struct {
// Invoked when an update is sent to a source.
merger Merger
// Sources and their lock.
sourceLock sync.RWMutex
// Maps source names to channels
sources map[string]chan interface{}
}
// NewMux creates a new mux that can merge changes from multiple sources.
func NewMux(merger Merger) *Mux {
mux := &Mux{
sources: make(map[string]chan interface{}),
merger: merger,
}
return mux
}
// Channel returns a channel where a configuration source
// can send updates of new configurations. Multiple calls with the same
// source will return the same channel. This allows change and state based sources
// to use the same channel. Different source names however will be treated as a
// union.
func (m *Mux) Channel(source string) chan interface{} {
if len(source) == 0 {
panic("Channel given an empty name")
}
m.sourceLock.Lock()
defer m.sourceLock.Unlock()
channel, exists := m.sources[source]
if exists {
return channel
}
newChannel := make(chan interface{})
m.sources[source] = newChannel
go util.Forever(func() { m.listen(source, newChannel) }, 0)
return newChannel
}
func (m *Mux) listen(source string, listenChannel <-chan interface{}) {
for update := range listenChannel {
m.merger.Merge(source, update)
}
}
// Accessor is an interface for retrieving the current merge state.
type Accessor interface {
// MergedState returns a representation of the current merge state.
// Must be reentrant when more than one source is defined.
MergedState() interface{}
}
// AccessorFunc implements the Accessor interface
type AccessorFunc func() interface{}
func (f AccessorFunc) MergedState() interface{} {
return f()
}
type Listener interface {
// OnUpdate is invoked when a change is made to an object.
OnUpdate(instance interface{})
}
// ListenerFunc receives a representation of the change or object.
type ListenerFunc func(instance interface{})
func (f ListenerFunc) OnUpdate(instance interface{}) {
f(instance)
}
type Watcher struct {
// Listeners for changes and their lock.
listenerLock sync.RWMutex
listeners []Listener
}
// Register a set of listeners that support the Listener interface and
// notify them on changes.
func NewWatcher() *Watcher {
return &Watcher{}
}
// Register Listener to receive updates of changes.
func (m *Watcher) Add(listener Listener) {
m.listenerLock.Lock()
defer m.listenerLock.Unlock()
m.listeners = append(m.listeners, listener)
}
// Notify all listeners
func (m *Watcher) Notify(instance interface{}) {
m.listenerLock.RLock()
listeners := m.listeners
m.listenerLock.RUnlock()
for _, listener := range listeners {
listener.OnUpdate(instance)
}
}

View File

@ -0,0 +1,120 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package config
import (
"reflect"
"testing"
)
func TestConfigurationChannels(t *testing.T) {
mux := NewMux(nil)
channelOne := mux.Channel("one")
if channelOne != mux.Channel("one") {
t.Error("Didn't get the same muxuration channel back with the same name")
}
channelTwo := mux.Channel("two")
if channelOne == channelTwo {
t.Error("Got back the same muxuration channel for different names")
}
}
type MergeMock struct {
source string
update interface{}
t *testing.T
}
func (m MergeMock) Merge(source string, update interface{}) error {
if m.source != source {
m.t.Errorf("Expected %s, Got %s", m.source, source)
}
if !reflect.DeepEqual(m.update, update) {
m.t.Errorf("Expected %s, Got %s", m.update, update)
}
return nil
}
func TestMergeInvoked(t *testing.T) {
merger := MergeMock{"one", "test", t}
mux := NewMux(&merger)
mux.Channel("one") <- "test"
}
func TestMergeFuncInvoked(t *testing.T) {
ch := make(chan bool)
mux := NewMux(MergeFunc(func(source string, update interface{}) error {
if source != "one" {
t.Errorf("Expected %s, Got %s", "one", source)
}
if update.(string) != "test" {
t.Errorf("Expected %s, Got %s", "test", update)
}
ch <- true
return nil
}))
mux.Channel("one") <- "test"
<-ch
}
func TestSimultaneousMerge(t *testing.T) {
ch := make(chan bool, 2)
mux := NewMux(MergeFunc(func(source string, update interface{}) error {
switch source {
case "one":
if update.(string) != "test" {
t.Errorf("Expected %s, Got %s", "test", update)
}
case "two":
if update.(string) != "test2" {
t.Errorf("Expected %s, Got %s", "test2", update)
}
default:
t.Errorf("Unexpected source, Got %s", update)
}
ch <- true
return nil
}))
source := mux.Channel("one")
source2 := mux.Channel("two")
source <- "test"
source2 <- "test2"
<-ch
<-ch
}
func TestWatcher(t *testing.T) {
watch := NewWatcher()
watch.Notify(struct{}{})
ch := make(chan bool, 2)
watch.Add(ListenerFunc(func(object interface{}) {
if object != "test" {
t.Errorf("Expected %s, Got %s", "test", object)
}
ch <- true
}))
watch.Add(ListenerFunc(func(object interface{}) {
if object != "test" {
t.Errorf("Expected %s, Got %s", "test", object)
}
ch <- true
}))
watch.Notify("test")
<-ch
<-ch
}

20
pkg/util/config/doc.go Normal file
View File

@ -0,0 +1,20 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package config provides utility objects for decoupling sources of configuration and the
// actual configuration state. Consumers must implement the Merger interface to unify
// the sources of change into an object.
package config