Merge pull request #11577 from wojtek-t/etcd_helper_interface

Prepare for extracting EtcdHelper interface
pull/6/head
Wojciech Tyczynski 2015-07-23 10:33:33 +02:00
commit 6580464ff1
10 changed files with 134 additions and 101 deletions

View File

@ -217,7 +217,7 @@ func (s *APIServer) verifyClusterIPFlags() {
}
func newEtcd(etcdConfigFile string, etcdServerList util.StringList, storageVersion string, pathPrefix string) (helper tools.EtcdHelper, err error) {
var client tools.EtcdGetSet
var client tools.EtcdClient
if etcdConfigFile != "" {
client, err = etcd.NewClientFromFile(etcdConfigFile)
if err != nil {

View File

@ -37,7 +37,7 @@ type Master string
func (Master) IsAnAPIObject() {}
// NewEtcdMasterElector returns an implementation of election.MasterElector backed by etcd.
func NewEtcdMasterElector(h tools.EtcdGetSet) MasterElector {
func NewEtcdMasterElector(h tools.EtcdClient) MasterElector {
return &etcdMasterElector{etcd: h}
}
@ -45,7 +45,7 @@ type empty struct{}
// internal implementation struct
type etcdMasterElector struct {
etcd tools.EtcdGetSet
etcd tools.EtcdClient
done chan empty
events chan watch.Event
}

View File

@ -125,7 +125,7 @@ type KubernetesScheduler struct {
executorGroup uint64
scheduleFunc PodScheduleFunc
client *client.Client
etcdClient tools.EtcdGetSet
etcdClient tools.EtcdClient
failoverTimeout float64 // in seconds
reconcileInterval int64
@ -158,7 +158,7 @@ type Config struct {
Executor *mesos.ExecutorInfo
ScheduleFunc PodScheduleFunc
Client *client.Client
EtcdClient tools.EtcdGetSet
EtcdClient tools.EtcdClient
FailoverTimeout float64
ReconcileInterval int64
ReconcileCooldown time.Duration

View File

@ -525,7 +525,7 @@ func validateLeadershipTransition(desired, current string) {
}
// hacked from https://github.com/GoogleCloudPlatform/kubernetes/blob/release-0.14/cmd/kube-apiserver/app/server.go
func newEtcd(etcdConfigFile string, etcdServerList util.StringList) (client tools.EtcdGetSet, err error) {
func newEtcd(etcdConfigFile string, etcdServerList util.StringList) (client tools.EtcdClient, err error) {
if etcdConfigFile != "" {
client, err = etcd.NewClientFromFile(etcdConfigFile)
} else {
@ -534,7 +534,7 @@ func newEtcd(etcdConfigFile string, etcdServerList util.StringList) (client tool
return
}
func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config) (*ha.SchedulerProcess, ha.DriverFactory, tools.EtcdGetSet, *uid.UID) {
func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config) (*ha.SchedulerProcess, ha.DriverFactory, tools.EtcdClient, *uid.UID) {
s.FrameworkName = strings.TrimSpace(s.FrameworkName)
if s.FrameworkName == "" {
@ -737,7 +737,7 @@ func (s *SchedulerServer) buildFrameworkInfo() (info *mesos.FrameworkInfo, cred
return
}
func (s *SchedulerServer) fetchFrameworkID(client tools.EtcdGetSet) (*mesos.FrameworkID, error) {
func (s *SchedulerServer) fetchFrameworkID(client tools.EtcdClient) (*mesos.FrameworkID, error) {
if s.FailoverTimeout > 0 {
if response, err := client.Get(meta.FrameworkIDKey, false, false); err != nil {
if !tools.IsEtcdNotFound(err) {

View File

@ -229,7 +229,7 @@ type Master struct {
// NewEtcdHelper returns an EtcdHelper for the provided arguments or an error if the version
// is incorrect.
func NewEtcdHelper(client tools.EtcdGetSet, version string, prefix string) (helper tools.EtcdHelper, err error) {
func NewEtcdHelper(client tools.EtcdClient, version string, prefix string) (helper tools.EtcdHelper, err error) {
if version == "" {
version = latest.Version
}

View File

@ -31,76 +31,26 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/conversion"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools/metrics"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/coreos/go-etcd/etcd"
"github.com/prometheus/client_golang/prometheus"
"github.com/golang/glog"
)
var (
cacheHitCounter = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "etcd_helper_cache_hit_count",
Help: "Counter of etcd helper cache hits.",
},
)
cacheMissCounter = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "etcd_helper_cache_miss_count",
Help: "Counter of etcd helper cache miss.",
},
)
cacheEntryCounter = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "etcd_helper_cache_entry_count",
Help: "Counter of etcd helper cache entries. This can be different from etcd_helper_cache_miss_count " +
"because two concurrent threads can miss the cache and generate the same entry twice.",
},
)
cacheGetLatency = prometheus.NewSummary(
prometheus.SummaryOpts{
Name: "etcd_request_cache_get_latencies_summary",
Help: "Latency in microseconds of getting an object from etcd cache",
},
)
cacheAddLatency = prometheus.NewSummary(
prometheus.SummaryOpts{
Name: "etcd_request_cache_add_latencies_summary",
Help: "Latency in microseconds of adding an object to etcd cache",
},
)
etcdRequestLatenciesSummary = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Name: "etcd_request_latencies_summary",
Help: "Etcd request latency summary in microseconds for each operation and object type.",
},
[]string{"operation", "type"},
)
)
const maxEtcdCacheEntries int = 50000
func init() {
prometheus.MustRegister(cacheHitCounter)
prometheus.MustRegister(cacheMissCounter)
prometheus.MustRegister(cacheEntryCounter)
prometheus.MustRegister(cacheAddLatency)
prometheus.MustRegister(cacheGetLatency)
prometheus.MustRegister(etcdRequestLatenciesSummary)
metrics.Register()
}
func getTypeName(obj interface{}) string {
return reflect.TypeOf(obj).String()
}
func recordEtcdRequestLatency(verb, resource string, startTime time.Time) {
etcdRequestLatenciesSummary.WithLabelValues(verb, resource).Observe(float64(time.Since(startTime) / time.Microsecond))
}
// EtcdHelper offers common object marshalling/unmarshalling operations on an etcd client.
type EtcdHelper struct {
Client EtcdGetSet
Client EtcdClient
Codec runtime.Codec
Copier runtime.ObjectCopier
// optional, no atomic operations can be performed without this interface
@ -121,7 +71,7 @@ type EtcdHelper struct {
// NewEtcdHelper creates a helper that works against objects that use the internal
// Kubernetes API objects.
// TODO: Refactor to take a runtiem.ObjectCopier
func NewEtcdHelper(client EtcdGetSet, codec runtime.Codec, prefix string) EtcdHelper {
func NewEtcdHelper(client EtcdClient, codec runtime.Codec, prefix string) EtcdHelper {
return EtcdHelper{
Client: client,
Codec: codec,
@ -234,7 +184,7 @@ type etcdCache interface {
func (h *EtcdHelper) getFromCache(index uint64) (runtime.Object, bool) {
startTime := time.Now()
defer func() {
cacheGetLatency.Observe(float64(time.Since(startTime) / time.Microsecond))
metrics.ObserveGetCache(startTime)
}()
obj, found := h.cache.Get(index)
if found {
@ -245,17 +195,17 @@ func (h *EtcdHelper) getFromCache(index uint64) (runtime.Object, bool) {
glog.Errorf("Error during DeepCopy of cached object: %q", err)
return nil, false
}
cacheHitCounter.Inc()
metrics.ObserveCacheHit()
return objCopy.(runtime.Object), true
}
cacheMissCounter.Inc()
metrics.ObserveCacheMiss()
return nil, false
}
func (h *EtcdHelper) addToCache(index uint64, obj runtime.Object) {
startTime := time.Now()
defer func() {
cacheAddLatency.Observe(float64(time.Since(startTime) / time.Microsecond))
metrics.ObserveAddCache(startTime)
}()
objCopy, err := h.Copier.Copy(obj)
if err != nil {
@ -264,7 +214,7 @@ func (h *EtcdHelper) addToCache(index uint64, obj runtime.Object) {
}
isOverwrite := h.cache.Add(index, objCopy)
if !isOverwrite {
cacheEntryCounter.Inc()
metrics.ObserveNewEntry()
}
}
@ -281,7 +231,7 @@ func (h *EtcdHelper) ExtractToList(key string, listObj runtime.Object) error {
startTime := time.Now()
trace.Step("About to list etcd node")
nodes, index, err := h.listEtcdNode(key)
recordEtcdRequestLatency("list", getTypeName(listPtr), startTime)
metrics.RecordEtcdRequestLatency("list", getTypeName(listPtr), startTime)
trace.Step("Etcd node listed")
if err != nil {
return err
@ -310,7 +260,7 @@ func (h *EtcdHelper) ExtractObjToList(key string, listObj runtime.Object) error
startTime := time.Now()
trace.Step("About to read etcd node")
response, err := h.Client.Get(key, false, false)
recordEtcdRequestLatency("get", getTypeName(listPtr), startTime)
metrics.RecordEtcdRequestLatency("get", getTypeName(listPtr), startTime)
trace.Step("Etcd node read")
if err != nil {
if IsEtcdNotFound(err) {
@ -348,7 +298,7 @@ func (h *EtcdHelper) ExtractObj(key string, objPtr runtime.Object, ignoreNotFoun
func (h *EtcdHelper) bodyAndExtractObj(key string, objPtr runtime.Object, ignoreNotFound bool) (body string, node *etcd.Node, res *etcd.Response, err error) {
startTime := time.Now()
response, err := h.Client.Get(key, false, false)
recordEtcdRequestLatency("get", getTypeName(objPtr), startTime)
metrics.RecordEtcdRequestLatency("get", getTypeName(objPtr), startTime)
if err != nil && !IsEtcdNotFound(err) {
return "", nil, nil, err
@ -404,7 +354,7 @@ func (h *EtcdHelper) CreateObj(key string, obj, out runtime.Object, ttl uint64)
startTime := time.Now()
response, err := h.Client.Create(key, string(data), ttl)
recordEtcdRequestLatency("create", getTypeName(obj), startTime)
metrics.RecordEtcdRequestLatency("create", getTypeName(obj), startTime)
if err != nil {
return err
}
@ -422,7 +372,7 @@ func (h *EtcdHelper) Delete(key string, recursive bool) error {
key = h.PrefixEtcdKey(key)
startTime := time.Now()
_, err := h.Client.Delete(key, recursive)
recordEtcdRequestLatency("delete", "UNKNOWN", startTime)
metrics.RecordEtcdRequestLatency("delete", "UNKNOWN", startTime)
return err
}
@ -435,7 +385,7 @@ func (h *EtcdHelper) DeleteObj(key string, out runtime.Object) error {
startTime := time.Now()
response, err := h.Client.Delete(key, false)
recordEtcdRequestLatency("delete", getTypeName(out), startTime)
metrics.RecordEtcdRequestLatency("delete", getTypeName(out), startTime)
if !IsEtcdNotFound(err) {
// if the object that existed prior to the delete is returned by etcd, update out.
if err != nil || response.PrevNode != nil {
@ -462,7 +412,7 @@ func (h *EtcdHelper) SetObj(key string, obj, out runtime.Object, ttl uint64) err
create = false
startTime := time.Now()
response, err = h.Client.CompareAndSwap(key, string(data), ttl, "", version)
recordEtcdRequestLatency("compareAndSwap", getTypeName(obj), startTime)
metrics.RecordEtcdRequestLatency("compareAndSwap", getTypeName(obj), startTime)
if err != nil {
return err
}
@ -472,7 +422,7 @@ func (h *EtcdHelper) SetObj(key string, obj, out runtime.Object, ttl uint64) err
// Create will fail if a key already exists.
startTime := time.Now()
response, err = h.Client.Create(key, string(data), ttl)
recordEtcdRequestLatency("create", getTypeName(obj), startTime)
metrics.RecordEtcdRequestLatency("create", getTypeName(obj), startTime)
}
if err != nil {
@ -589,7 +539,7 @@ func (h *EtcdHelper) GuaranteedUpdate(key string, ptrToType runtime.Object, igno
if index == 0 {
startTime := time.Now()
response, err := h.Client.Create(key, string(data), ttl)
recordEtcdRequestLatency("create", getTypeName(ptrToType), startTime)
metrics.RecordEtcdRequestLatency("create", getTypeName(ptrToType), startTime)
if IsEtcdNodeExist(err) {
continue
}
@ -604,7 +554,7 @@ func (h *EtcdHelper) GuaranteedUpdate(key string, ptrToType runtime.Object, igno
startTime := time.Now()
// Swap origBody with data, if origBody is the latest etcd data.
response, err := h.Client.CompareAndSwap(key, string(data), ttl, origBody, index)
recordEtcdRequestLatency("compareAndSwap", getTypeName(ptrToType), startTime)
metrics.RecordEtcdRequestLatency("compareAndSwap", getTypeName(ptrToType), startTime)
if IsEtcdTestFailed(err) {
// Try again.
continue

View File

@ -186,7 +186,7 @@ func newEtcdWatcher(list bool, include includeFunc, filter FilterFunc, encoding
// etcdWatch calls etcd's Watch function, and handles any errors. Meant to be called
// as a goroutine.
func (w *etcdWatcher) etcdWatch(client EtcdGetSet, key string, resourceVersion uint64) {
func (w *etcdWatcher) etcdWatch(client EtcdClient, key string, resourceVersion uint64) {
defer util.HandleCrash()
defer close(w.etcdError)
if resourceVersion == 0 {
@ -204,7 +204,7 @@ func (w *etcdWatcher) etcdWatch(client EtcdGetSet, key string, resourceVersion u
}
// etcdGetInitialWatchState turns an etcd Get request into a watch equivalent
func etcdGetInitialWatchState(client EtcdGetSet, key string, recursive bool, incoming chan<- *etcd.Response) (resourceVersion uint64, err error) {
func etcdGetInitialWatchState(client EtcdClient, key string, recursive bool, incoming chan<- *etcd.Response) (resourceVersion uint64, err error) {
resp, err := client.Get(key, false, recursive)
if err != nil {
if !IsEtcdNotFound(err) {

View File

@ -18,7 +18,6 @@ package tools
import (
"errors"
"fmt"
"sort"
"sync"
@ -125,14 +124,6 @@ func (f *FakeEtcdClient) updateResponse(key string) {
f.Data[key] = *resp.N
}
func (f *FakeEtcdClient) AddChild(key, data string, ttl uint64) (*etcd.Response, error) {
f.Mutex.Lock()
defer f.Mutex.Unlock()
f.Ix = f.Ix + 1
return f.setLocked(fmt.Sprintf("%s/%d", key, f.Ix), data, ttl)
}
func (f *FakeEtcdClient) Get(key string, sort, recursive bool) (*etcd.Response, error) {
if f.Err != nil {
return nil, f.Err

View File

@ -39,7 +39,6 @@ var (
// EtcdClient is an injectable interface for testing.
type EtcdClient interface {
GetCluster() []string
AddChild(key, data string, ttl uint64) (*etcd.Response, error)
Get(key string, sort, recursive bool) (*etcd.Response, error)
Set(key, value string, ttl uint64) (*etcd.Response, error)
Create(key, value string, ttl uint64) (*etcd.Response, error)
@ -50,17 +49,6 @@ type EtcdClient interface {
Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error)
}
// EtcdGetSet interface exposes only the etcd operations needed by EtcdHelper.
type EtcdGetSet interface {
GetCluster() []string
Get(key string, sort, recursive bool) (*etcd.Response, error)
Set(key, value string, ttl uint64) (*etcd.Response, error)
Create(key, value string, ttl uint64) (*etcd.Response, error)
Delete(key string, recursive bool) (*etcd.Response, error)
CompareAndSwap(key, value string, ttl uint64, prevValue string, prevIndex uint64) (*etcd.Response, error)
Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error)
}
// EtcdVersioner abstracts setting and retrieving fields from the etcd response onto the object
// or list.
type EtcdVersioner interface {

View File

@ -0,0 +1,104 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
import (
"sync"
"time"
"github.com/prometheus/client_golang/prometheus"
)
var (
cacheHitCounter = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "etcd_helper_cache_hit_count",
Help: "Counter of etcd helper cache hits.",
},
)
cacheMissCounter = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "etcd_helper_cache_miss_count",
Help: "Counter of etcd helper cache miss.",
},
)
cacheEntryCounter = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "etcd_helper_cache_entry_count",
Help: "Counter of etcd helper cache entries. This can be different from etcd_helper_cache_miss_count " +
"because two concurrent threads can miss the cache and generate the same entry twice.",
},
)
cacheGetLatency = prometheus.NewSummary(
prometheus.SummaryOpts{
Name: "etcd_request_cache_get_latencies_summary",
Help: "Latency in microseconds of getting an object from etcd cache",
},
)
cacheAddLatency = prometheus.NewSummary(
prometheus.SummaryOpts{
Name: "etcd_request_cache_add_latencies_summary",
Help: "Latency in microseconds of adding an object to etcd cache",
},
)
etcdRequestLatenciesSummary = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Name: "etcd_request_latencies_summary",
Help: "Etcd request latency summary in microseconds for each operation and object type.",
},
[]string{"operation", "type"},
)
)
var registerMetrics sync.Once
// Register all metrics.
func Register() {
// Register the metrics.
registerMetrics.Do(func() {
prometheus.MustRegister(cacheHitCounter)
prometheus.MustRegister(cacheMissCounter)
prometheus.MustRegister(cacheEntryCounter)
prometheus.MustRegister(cacheAddLatency)
prometheus.MustRegister(cacheGetLatency)
prometheus.MustRegister(etcdRequestLatenciesSummary)
})
}
func RecordEtcdRequestLatency(verb, resource string, startTime time.Time) {
etcdRequestLatenciesSummary.WithLabelValues(verb, resource).Observe(float64(time.Since(startTime) / time.Microsecond))
}
func ObserveGetCache(startTime time.Time) {
cacheGetLatency.Observe(float64(time.Since(startTime) / time.Microsecond))
}
func ObserveAddCache(startTime time.Time) {
cacheAddLatency.Observe(float64(time.Since(startTime) / time.Microsecond))
}
func ObserveCacheHit() {
cacheHitCounter.Inc()
}
func ObserveCacheMiss() {
cacheMissCounter.Inc()
}
func ObserveNewEntry() {
cacheEntryCounter.Inc()
}