From 38ec4ff8c0e87610d1685b30c78da7049e112056 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Thu, 3 Jul 2014 21:38:48 -0400 Subject: [PATCH] Create a configuration update utility Extracted from proxy/config --- pkg/util/config/config.go | 140 +++++++++++++++++++++++++++++++++ pkg/util/config/config_test.go | 120 ++++++++++++++++++++++++++++ pkg/util/config/doc.go | 20 +++++ 3 files changed, 280 insertions(+) create mode 100644 pkg/util/config/config.go create mode 100644 pkg/util/config/config_test.go create mode 100644 pkg/util/config/doc.go diff --git a/pkg/util/config/config.go b/pkg/util/config/config.go new file mode 100644 index 0000000000..18fa574704 --- /dev/null +++ b/pkg/util/config/config.go @@ -0,0 +1,140 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or cied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package config + +import ( + "sync" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" +) + +type Merger interface { + // Invoked when a change from a source is received. May also function as an incremental + // merger if you wish to consume changes incrementally. Must be reentrant when more than + // one source is defined. + Merge(source string, update interface{}) error +} + +// MergeFunc implements the Merger interface +type MergeFunc func(source string, update interface{}) error + +func (f MergeFunc) Merge(source string, update interface{}) error { + return f(source, update) +} + +// Mux is a class for merging configuration from multiple sources. Changes are +// pushed via channels and sent to the merge function. +type Mux struct { + // Invoked when an update is sent to a source. + merger Merger + + // Sources and their lock. + sourceLock sync.RWMutex + // Maps source names to channels + sources map[string]chan interface{} +} + +// NewMux creates a new mux that can merge changes from multiple sources. +func NewMux(merger Merger) *Mux { + mux := &Mux{ + sources: make(map[string]chan interface{}), + merger: merger, + } + return mux +} + +// Channel returns a channel where a configuration source +// can send updates of new configurations. Multiple calls with the same +// source will return the same channel. This allows change and state based sources +// to use the same channel. Different source names however will be treated as a +// union. +func (m *Mux) Channel(source string) chan interface{} { + if len(source) == 0 { + panic("Channel given an empty name") + } + m.sourceLock.Lock() + defer m.sourceLock.Unlock() + channel, exists := m.sources[source] + if exists { + return channel + } + newChannel := make(chan interface{}) + m.sources[source] = newChannel + go util.Forever(func() { m.listen(source, newChannel) }, 0) + return newChannel +} + +func (m *Mux) listen(source string, listenChannel <-chan interface{}) { + for update := range listenChannel { + m.merger.Merge(source, update) + } +} + +// Accessor is an interface for retrieving the current merge state. +type Accessor interface { + // MergedState returns a representation of the current merge state. + // Must be reentrant when more than one source is defined. + MergedState() interface{} +} + +// AccessorFunc implements the Accessor interface +type AccessorFunc func() interface{} + +func (f AccessorFunc) MergedState() interface{} { + return f() +} + +type Listener interface { + // OnUpdate is invoked when a change is made to an object. + OnUpdate(instance interface{}) +} + +// ListenerFunc receives a representation of the change or object. +type ListenerFunc func(instance interface{}) + +func (f ListenerFunc) OnUpdate(instance interface{}) { + f(instance) +} + +type Watcher struct { + // Listeners for changes and their lock. + listenerLock sync.RWMutex + listeners []Listener +} + +// Register a set of listeners that support the Listener interface and +// notify them on changes. +func NewWatcher() *Watcher { + return &Watcher{} +} + +// Register Listener to receive updates of changes. +func (m *Watcher) Add(listener Listener) { + m.listenerLock.Lock() + defer m.listenerLock.Unlock() + m.listeners = append(m.listeners, listener) +} + +// Notify all listeners +func (m *Watcher) Notify(instance interface{}) { + m.listenerLock.RLock() + listeners := m.listeners + m.listenerLock.RUnlock() + for _, listener := range listeners { + listener.OnUpdate(instance) + } +} diff --git a/pkg/util/config/config_test.go b/pkg/util/config/config_test.go new file mode 100644 index 0000000000..fbf1a4b600 --- /dev/null +++ b/pkg/util/config/config_test.go @@ -0,0 +1,120 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package config + +import ( + "reflect" + "testing" +) + +func TestConfigurationChannels(t *testing.T) { + mux := NewMux(nil) + channelOne := mux.Channel("one") + if channelOne != mux.Channel("one") { + t.Error("Didn't get the same muxuration channel back with the same name") + } + channelTwo := mux.Channel("two") + if channelOne == channelTwo { + t.Error("Got back the same muxuration channel for different names") + } +} + +type MergeMock struct { + source string + update interface{} + t *testing.T +} + +func (m MergeMock) Merge(source string, update interface{}) error { + if m.source != source { + m.t.Errorf("Expected %s, Got %s", m.source, source) + } + if !reflect.DeepEqual(m.update, update) { + m.t.Errorf("Expected %s, Got %s", m.update, update) + } + return nil +} + +func TestMergeInvoked(t *testing.T) { + merger := MergeMock{"one", "test", t} + mux := NewMux(&merger) + mux.Channel("one") <- "test" +} + +func TestMergeFuncInvoked(t *testing.T) { + ch := make(chan bool) + mux := NewMux(MergeFunc(func(source string, update interface{}) error { + if source != "one" { + t.Errorf("Expected %s, Got %s", "one", source) + } + if update.(string) != "test" { + t.Errorf("Expected %s, Got %s", "test", update) + } + ch <- true + return nil + })) + mux.Channel("one") <- "test" + <-ch +} + +func TestSimultaneousMerge(t *testing.T) { + ch := make(chan bool, 2) + mux := NewMux(MergeFunc(func(source string, update interface{}) error { + switch source { + case "one": + if update.(string) != "test" { + t.Errorf("Expected %s, Got %s", "test", update) + } + case "two": + if update.(string) != "test2" { + t.Errorf("Expected %s, Got %s", "test2", update) + } + default: + t.Errorf("Unexpected source, Got %s", update) + } + ch <- true + return nil + })) + source := mux.Channel("one") + source2 := mux.Channel("two") + source <- "test" + source2 <- "test2" + <-ch + <-ch +} + +func TestWatcher(t *testing.T) { + watch := NewWatcher() + watch.Notify(struct{}{}) + + ch := make(chan bool, 2) + watch.Add(ListenerFunc(func(object interface{}) { + if object != "test" { + t.Errorf("Expected %s, Got %s", "test", object) + } + ch <- true + })) + watch.Add(ListenerFunc(func(object interface{}) { + if object != "test" { + t.Errorf("Expected %s, Got %s", "test", object) + } + ch <- true + })) + watch.Notify("test") + <-ch + <-ch +} diff --git a/pkg/util/config/doc.go b/pkg/util/config/doc.go new file mode 100644 index 0000000000..7277e7511d --- /dev/null +++ b/pkg/util/config/doc.go @@ -0,0 +1,20 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package config provides utility objects for decoupling sources of configuration and the +// actual configuration state. Consumers must implement the Merger interface to unify +// the sources of change into an object. +package config