mirror of https://github.com/k3s-io/k3s
commit
d3243b8778
|
@ -48,11 +48,8 @@ import (
|
|||
"k8s.io/kubernetes/pkg/master/ports"
|
||||
"k8s.io/kubernetes/pkg/storage"
|
||||
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
|
||||
"k8s.io/kubernetes/pkg/tools"
|
||||
"k8s.io/kubernetes/pkg/util"
|
||||
forked "k8s.io/kubernetes/third_party/forked/coreos/go-etcd/etcd"
|
||||
|
||||
"github.com/coreos/go-etcd/etcd"
|
||||
"github.com/golang/glog"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/spf13/pflag"
|
||||
|
@ -99,7 +96,6 @@ type APIServer struct {
|
|||
AdmissionControl string
|
||||
AdmissionControlConfigFile string
|
||||
EtcdServerList []string
|
||||
EtcdConfigFile string
|
||||
EtcdServersOverrides []string
|
||||
EtcdPathPrefix string
|
||||
CorsAllowedOriginList []string
|
||||
|
@ -234,7 +230,6 @@ func (s *APIServer) AddFlags(fs *pflag.FlagSet) {
|
|||
fs.StringVar(&s.AdmissionControl, "admission-control", s.AdmissionControl, "Ordered list of plug-ins to do admission control of resources into cluster. Comma-delimited list of: "+strings.Join(admission.GetPlugins(), ", "))
|
||||
fs.StringVar(&s.AdmissionControlConfigFile, "admission-control-config-file", s.AdmissionControlConfigFile, "File with admission control configuration.")
|
||||
fs.StringSliceVar(&s.EtcdServerList, "etcd-servers", s.EtcdServerList, "List of etcd servers to watch (http://ip:port), comma separated. Mutually exclusive with -etcd-config")
|
||||
fs.StringVar(&s.EtcdConfigFile, "etcd-config", s.EtcdConfigFile, "The config file for the etcd client. Mutually exclusive with -etcd-servers.")
|
||||
fs.StringSliceVar(&s.EtcdServersOverrides, "etcd-servers-overrides", s.EtcdServersOverrides, "Per-resource etcd servers overrides, comma separated. The individual override format: group/resource#servers, where servers are http://ip:port, semicolon separated.")
|
||||
fs.StringVar(&s.EtcdPathPrefix, "etcd-prefix", s.EtcdPathPrefix, "The prefix for all resource paths in etcd.")
|
||||
fs.StringSliceVar(&s.CorsAllowedOriginList, "cors-allowed-origins", s.CorsAllowedOriginList, "List of allowed origins for CORS, comma separated. An allowed origin can be a regular expression to support subdomain matching. If this list is empty CORS will not be enabled.")
|
||||
|
@ -283,36 +278,21 @@ func (s *APIServer) verifyClusterIPFlags() {
|
|||
}
|
||||
}
|
||||
|
||||
type newEtcdFunc func(string, []string, meta.VersionInterfacesFunc, string, string) (storage.Interface, error)
|
||||
type newEtcdFunc func([]string, meta.VersionInterfacesFunc, string, string) (storage.Interface, error)
|
||||
|
||||
func newEtcd(etcdConfigFile string, etcdServerList []string, interfacesFunc meta.VersionInterfacesFunc, storageVersion, pathPrefix string) (etcdStorage storage.Interface, err error) {
|
||||
func newEtcd(etcdServerList []string, interfacesFunc meta.VersionInterfacesFunc, storageVersion, pathPrefix string) (etcdStorage storage.Interface, err error) {
|
||||
if storageVersion == "" {
|
||||
return etcdStorage, fmt.Errorf("storageVersion is required to create a etcd storage")
|
||||
}
|
||||
var client tools.EtcdClient
|
||||
if etcdConfigFile != "" {
|
||||
client, err = etcd.NewClientFromFile(etcdConfigFile)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
etcdClient := etcd.NewClient(etcdServerList)
|
||||
transport := &http.Transport{
|
||||
Dial: forked.Dial,
|
||||
TLSClientConfig: &tls.Config{
|
||||
InsecureSkipVerify: true,
|
||||
},
|
||||
MaxIdleConnsPerHost: 500,
|
||||
}
|
||||
etcdClient.SetTransport(transport)
|
||||
client = etcdClient
|
||||
}
|
||||
var storageConfig etcdstorage.EtcdConfig
|
||||
storageConfig.ServerList = etcdServerList
|
||||
storageConfig.Prefix = pathPrefix
|
||||
versionedInterface, err := interfacesFunc(storageVersion)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
etcdStorage = etcdstorage.NewEtcdStorage(client, versionedInterface.Codec, pathPrefix)
|
||||
return etcdStorage, nil
|
||||
storageConfig.Codec = versionedInterface.Codec
|
||||
return storageConfig.NewStorage()
|
||||
}
|
||||
|
||||
// convert to a map between group and groupVersions.
|
||||
|
@ -360,7 +340,7 @@ func updateEtcdOverrides(overrides []string, storageVersions map[string]string,
|
|||
}
|
||||
|
||||
servers := strings.Split(tokens[1], ";")
|
||||
etcdOverrideStorage, err := newEtcdFn("", servers, apigroup.InterfacesFor, storageVersions[apigroup.GroupVersion.Group], prefix)
|
||||
etcdOverrideStorage, err := newEtcdFn(servers, apigroup.InterfacesFor, storageVersions[apigroup.GroupVersion.Group], prefix)
|
||||
if err != nil {
|
||||
glog.Fatalf("Invalid storage version or misconfigured etcd for %s: %v", tokens[0], err)
|
||||
}
|
||||
|
@ -386,8 +366,8 @@ func (s *APIServer) Run(_ []string) error {
|
|||
}
|
||||
glog.Infof("Will report %v as public IP address.", s.AdvertiseAddress)
|
||||
|
||||
if (s.EtcdConfigFile != "" && len(s.EtcdServerList) != 0) || (s.EtcdConfigFile == "" && len(s.EtcdServerList) == 0) {
|
||||
glog.Fatalf("Specify either --etcd-servers or --etcd-config")
|
||||
if len(s.EtcdServerList) == 0 {
|
||||
glog.Fatalf("--etcd-servers must be specified")
|
||||
}
|
||||
|
||||
if s.KubernetesServiceNodePort > 0 && !s.ServiceNodePortRange.Contains(s.KubernetesServiceNodePort) {
|
||||
|
@ -471,7 +451,7 @@ func (s *APIServer) Run(_ []string) error {
|
|||
if _, found := storageVersions[legacyV1Group.GroupVersion.Group]; !found {
|
||||
glog.Fatalf("Couldn't find the storage version for group: %q in storageVersions: %v", legacyV1Group.GroupVersion.Group, storageVersions)
|
||||
}
|
||||
etcdStorage, err := newEtcd(s.EtcdConfigFile, s.EtcdServerList, legacyV1Group.InterfacesFor, storageVersions[legacyV1Group.GroupVersion.Group], s.EtcdPathPrefix)
|
||||
etcdStorage, err := newEtcd(s.EtcdServerList, legacyV1Group.InterfacesFor, storageVersions[legacyV1Group.GroupVersion.Group], s.EtcdPathPrefix)
|
||||
if err != nil {
|
||||
glog.Fatalf("Invalid storage version or misconfigured etcd: %v", err)
|
||||
}
|
||||
|
@ -485,7 +465,7 @@ func (s *APIServer) Run(_ []string) error {
|
|||
if _, found := storageVersions[expGroup.GroupVersion.Group]; !found {
|
||||
glog.Fatalf("Couldn't find the storage version for group: %q in storageVersions: %v", expGroup.GroupVersion.Group, storageVersions)
|
||||
}
|
||||
expEtcdStorage, err := newEtcd(s.EtcdConfigFile, s.EtcdServerList, expGroup.InterfacesFor, storageVersions[expGroup.GroupVersion.Group], s.EtcdPathPrefix)
|
||||
expEtcdStorage, err := newEtcd(s.EtcdServerList, expGroup.InterfacesFor, storageVersions[expGroup.GroupVersion.Group], s.EtcdPathPrefix)
|
||||
if err != nil {
|
||||
glog.Fatalf("Invalid extensions storage version or misconfigured etcd: %v", err)
|
||||
}
|
||||
|
|
|
@ -130,7 +130,7 @@ func TestUpdateEtcdOverrides(t *testing.T) {
|
|||
}
|
||||
|
||||
for _, test := range testCases {
|
||||
newEtcd := func(_ string, serverList []string, _ meta.VersionInterfacesFunc, _, _ string) (storage.Interface, error) {
|
||||
newEtcd := func(serverList []string, _ meta.VersionInterfacesFunc, _, _ string) (storage.Interface, error) {
|
||||
if !reflect.DeepEqual(test.servers, serverList) {
|
||||
t.Errorf("unexpected server list, expected: %#v, got: %#v", test.servers, serverList)
|
||||
}
|
||||
|
|
|
@ -23,7 +23,6 @@ import (
|
|||
"github.com/coreos/go-etcd/etcd"
|
||||
"github.com/golang/glog"
|
||||
etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util"
|
||||
"k8s.io/kubernetes/pkg/tools"
|
||||
"k8s.io/kubernetes/pkg/util"
|
||||
"k8s.io/kubernetes/pkg/watch"
|
||||
)
|
||||
|
@ -38,7 +37,7 @@ type Master string
|
|||
func (Master) IsAnAPIObject() {}
|
||||
|
||||
// NewEtcdMasterElector returns an implementation of election.MasterElector backed by etcd.
|
||||
func NewEtcdMasterElector(h tools.EtcdClient) MasterElector {
|
||||
func NewEtcdMasterElector(h *etcd.Client) MasterElector {
|
||||
return &etcdMasterElector{etcd: h}
|
||||
}
|
||||
|
||||
|
@ -46,7 +45,7 @@ type empty struct{}
|
|||
|
||||
// internal implementation struct
|
||||
type etcdMasterElector struct {
|
||||
etcd tools.EtcdClient
|
||||
etcd *etcd.Client
|
||||
done chan empty
|
||||
events chan watch.Event
|
||||
}
|
||||
|
|
|
@ -17,12 +17,9 @@ limitations under the License.
|
|||
package election
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/coreos/go-etcd/etcd"
|
||||
etcdtesting "k8s.io/kubernetes/pkg/storage/etcd/testing"
|
||||
"k8s.io/kubernetes/pkg/tools"
|
||||
"k8s.io/kubernetes/pkg/watch"
|
||||
)
|
||||
|
||||
|
@ -57,77 +54,21 @@ func TestEtcdMasterNoOther(t *testing.T) {
|
|||
w.Stop()
|
||||
}
|
||||
|
||||
// MockClient is wrapper aroung tools.EtcdClient.
|
||||
type MockClient struct {
|
||||
client tools.EtcdClient
|
||||
t *testing.T
|
||||
// afterGetFunc is called after each Get() call.
|
||||
afterGetFunc func()
|
||||
calls []string
|
||||
}
|
||||
|
||||
func (m *MockClient) GetCluster() []string {
|
||||
return m.client.GetCluster()
|
||||
}
|
||||
|
||||
func (m *MockClient) Get(key string, sort, recursive bool) (*etcd.Response, error) {
|
||||
m.calls = append(m.calls, "get")
|
||||
defer m.afterGetFunc()
|
||||
response, err := m.client.Get(key, sort, recursive)
|
||||
return response, err
|
||||
}
|
||||
|
||||
func (m *MockClient) Set(key, value string, ttl uint64) (*etcd.Response, error) {
|
||||
return m.client.Set(key, value, ttl)
|
||||
}
|
||||
|
||||
func (m *MockClient) Create(key, value string, ttl uint64) (*etcd.Response, error) {
|
||||
m.calls = append(m.calls, "create")
|
||||
return m.client.Create(key, value, ttl)
|
||||
}
|
||||
|
||||
func (m *MockClient) CompareAndSwap(key, value string, ttl uint64, prevValue string, prevIndex uint64) (*etcd.Response, error) {
|
||||
return m.client.CompareAndSwap(key, value, ttl, prevValue, prevIndex)
|
||||
}
|
||||
|
||||
func (m *MockClient) Delete(key string, recursive bool) (*etcd.Response, error) {
|
||||
return m.client.Delete(key, recursive)
|
||||
}
|
||||
|
||||
func (m *MockClient) Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error) {
|
||||
return m.client.Watch(prefix, waitIndex, recursive, receiver, stop)
|
||||
}
|
||||
|
||||
func TestEtcdMasterNoOtherThenConflict(t *testing.T) {
|
||||
server := etcdtesting.NewEtcdTestClientServer(t)
|
||||
defer server.Terminate(t)
|
||||
|
||||
// We set up the following scenario:
|
||||
// - after each Get() call, we write "baz" to a path
|
||||
// - this is simulating someone else writing a data
|
||||
// - the value written by someone else is the new value
|
||||
path := "foo"
|
||||
client := &MockClient{
|
||||
client: server.Client,
|
||||
t: t,
|
||||
afterGetFunc: func() {
|
||||
if _, err := server.Client.Set(path, "baz", 0); err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
},
|
||||
calls: make([]string, 0),
|
||||
}
|
||||
master := NewEtcdMasterElector(server.Client)
|
||||
leader := NewEtcdMasterElector(server.Client)
|
||||
|
||||
master := NewEtcdMasterElector(client)
|
||||
w_ldr := leader.Elect(path, "baz")
|
||||
result := <-w_ldr.ResultChan()
|
||||
w := master.Elect(path, "bar")
|
||||
result := <-w.ResultChan()
|
||||
result = <-w.ResultChan()
|
||||
if result.Type != watch.Modified || result.Object.(Master) != "baz" {
|
||||
t.Errorf("unexpected event: %#v", result)
|
||||
}
|
||||
w.Stop()
|
||||
|
||||
expectedCalls := []string{"get", "create", "get"}
|
||||
if !reflect.DeepEqual(client.calls, expectedCalls) {
|
||||
t.Errorf("unexpected calls: %#v", client.calls)
|
||||
}
|
||||
w_ldr.Stop()
|
||||
}
|
||||
|
|
|
@ -75,7 +75,6 @@ import (
|
|||
"k8s.io/kubernetes/pkg/healthz"
|
||||
"k8s.io/kubernetes/pkg/master/ports"
|
||||
etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util"
|
||||
"k8s.io/kubernetes/pkg/tools"
|
||||
|
||||
// lock to this API version, compilation will fail when this becomes unsupported
|
||||
_ "k8s.io/kubernetes/pkg/api/v1"
|
||||
|
@ -630,7 +629,7 @@ func validateLeadershipTransition(desired, current string) {
|
|||
}
|
||||
|
||||
// hacked from https://github.com/GoogleCloudPlatform/kubernetes/blob/release-0.14/cmd/kube-apiserver/app/server.go
|
||||
func newEtcd(etcdConfigFile string, etcdServerList []string) (client tools.EtcdClient, err error) {
|
||||
func newEtcd(etcdConfigFile string, etcdServerList []string) (client *etcd.Client, err error) {
|
||||
if etcdConfigFile != "" {
|
||||
client, err = etcd.NewClientFromFile(etcdConfigFile)
|
||||
} else {
|
||||
|
@ -639,7 +638,7 @@ func newEtcd(etcdConfigFile string, etcdServerList []string) (client tools.EtcdC
|
|||
return
|
||||
}
|
||||
|
||||
func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config) (*ha.SchedulerProcess, ha.DriverFactory, tools.EtcdClient, *mesos.ExecutorID) {
|
||||
func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config) (*ha.SchedulerProcess, ha.DriverFactory, *etcd.Client, *mesos.ExecutorID) {
|
||||
s.frameworkName = strings.TrimSpace(s.frameworkName)
|
||||
if s.frameworkName == "" {
|
||||
log.Fatalf("framework-name must be a non-empty string")
|
||||
|
@ -917,7 +916,7 @@ func (s *SchedulerServer) buildFrameworkInfo() (info *mesos.FrameworkInfo, cred
|
|||
return
|
||||
}
|
||||
|
||||
func (s *SchedulerServer) fetchFrameworkID(client tools.EtcdClient) (*mesos.FrameworkID, error) {
|
||||
func (s *SchedulerServer) fetchFrameworkID(client *etcd.Client) (*mesos.FrameworkID, error) {
|
||||
if s.failoverTimeout > 0 {
|
||||
if response, err := client.Get(meta.FrameworkIDKey, false, false); err != nil {
|
||||
if !etcdutil.IsEtcdNotFound(err) {
|
||||
|
|
|
@ -64,7 +64,6 @@ kube-apiserver
|
|||
--cloud-provider="": The provider for cloud services. Empty string for no provider.
|
||||
--cluster-name="kubernetes": The instance prefix for the cluster
|
||||
--cors-allowed-origins=[]: List of allowed origins for CORS, comma separated. An allowed origin can be a regular expression to support subdomain matching. If this list is empty CORS will not be enabled.
|
||||
--etcd-config="": The config file for the etcd client. Mutually exclusive with -etcd-servers.
|
||||
--etcd-prefix="/registry": The prefix for all resource paths in etcd.
|
||||
--etcd-servers=[]: List of etcd servers to watch (http://ip:port), comma separated. Mutually exclusive with -etcd-config
|
||||
--etcd-servers-overrides=[]: Per-resource etcd servers overrides, comma separated. The individual override format: group/resource#servers, where servers are http://ip:port, semicolon separated.
|
||||
|
@ -107,7 +106,7 @@ kube-apiserver
|
|||
--watch-cache[=true]: Enable watch caching in the apiserver
|
||||
```
|
||||
|
||||
###### Auto generated by spf13/cobra on 3-Dec-2015
|
||||
###### Auto generated by spf13/cobra on 9-Dec-2015
|
||||
|
||||
|
||||
<!-- BEGIN MUNGE: GENERATED_ANALYTICS -->
|
||||
|
|
|
@ -17,14 +17,15 @@ limitations under the License.
|
|||
package etcd
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"path"
|
||||
"reflect"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/go-etcd/etcd"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/meta"
|
||||
"k8s.io/kubernetes/pkg/conversion"
|
||||
|
@ -32,15 +33,48 @@ import (
|
|||
"k8s.io/kubernetes/pkg/storage"
|
||||
"k8s.io/kubernetes/pkg/storage/etcd/metrics"
|
||||
etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util"
|
||||
"k8s.io/kubernetes/pkg/tools"
|
||||
"k8s.io/kubernetes/pkg/util"
|
||||
"k8s.io/kubernetes/pkg/watch"
|
||||
|
||||
"github.com/coreos/go-etcd/etcd"
|
||||
"github.com/golang/glog"
|
||||
"golang.org/x/net/context"
|
||||
forked "k8s.io/kubernetes/third_party/forked/coreos/go-etcd/etcd"
|
||||
)
|
||||
|
||||
func NewEtcdStorage(client tools.EtcdClient, codec runtime.Codec, prefix string) storage.Interface {
|
||||
// storage.Config object for etcd.
|
||||
type EtcdConfig struct {
|
||||
ServerList []string
|
||||
Codec runtime.Codec
|
||||
Prefix string
|
||||
}
|
||||
|
||||
// implements storage.Config
|
||||
func (c *EtcdConfig) GetType() string {
|
||||
return "etcd"
|
||||
}
|
||||
|
||||
// implements storage.Config
|
||||
func (c *EtcdConfig) NewStorage() (storage.Interface, error) {
|
||||
etcdClient := etcd.NewClient(c.ServerList)
|
||||
if etcdClient == nil {
|
||||
return nil, errors.New("Failed to create new etcd client from serverlist")
|
||||
}
|
||||
transport := &http.Transport{
|
||||
Dial: forked.Dial,
|
||||
TLSClientConfig: &tls.Config{
|
||||
InsecureSkipVerify: true,
|
||||
},
|
||||
MaxIdleConnsPerHost: 500,
|
||||
}
|
||||
etcdClient.SetTransport(transport)
|
||||
|
||||
return NewEtcdStorage(etcdClient, c.Codec, c.Prefix), nil
|
||||
}
|
||||
|
||||
// Creates a new storage interface from the client
|
||||
// TODO: deprecate in favor of storage.Config abstraction over time
|
||||
func NewEtcdStorage(client *etcd.Client, codec runtime.Codec, prefix string) storage.Interface {
|
||||
return &etcdHelper{
|
||||
client: client,
|
||||
codec: codec,
|
||||
|
@ -53,7 +87,7 @@ func NewEtcdStorage(client tools.EtcdClient, codec runtime.Codec, prefix string)
|
|||
|
||||
// etcdHelper is the reference implementation of storage.Interface.
|
||||
type etcdHelper struct {
|
||||
client tools.EtcdClient
|
||||
client *etcd.Client
|
||||
codec runtime.Codec
|
||||
copier runtime.ObjectCopier
|
||||
// optional, has to be set to perform any atomic operations
|
||||
|
|
|
@ -22,6 +22,7 @@ import (
|
|||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/coreos/go-etcd/etcd"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"golang.org/x/net/context"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
|
@ -34,10 +35,6 @@ import (
|
|||
etcdtesting "k8s.io/kubernetes/pkg/storage/etcd/testing"
|
||||
etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util"
|
||||
storagetesting "k8s.io/kubernetes/pkg/storage/testing"
|
||||
|
||||
// TODO: once fakeClient has been purged move utils
|
||||
// and eliminate these deps
|
||||
"k8s.io/kubernetes/pkg/tools"
|
||||
)
|
||||
|
||||
const validEtcdVersion = "etcd 2.0.9"
|
||||
|
@ -58,7 +55,7 @@ func init() {
|
|||
)
|
||||
}
|
||||
|
||||
func newEtcdHelper(client tools.EtcdClient, codec runtime.Codec, prefix string) etcdHelper {
|
||||
func newEtcdHelper(client *etcd.Client, codec runtime.Codec, prefix string) etcdHelper {
|
||||
return *NewEtcdStorage(client, codec, prefix).(*etcdHelper)
|
||||
}
|
||||
|
||||
|
|
|
@ -25,7 +25,6 @@ import (
|
|||
"k8s.io/kubernetes/pkg/runtime"
|
||||
"k8s.io/kubernetes/pkg/storage"
|
||||
etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util"
|
||||
"k8s.io/kubernetes/pkg/tools"
|
||||
"k8s.io/kubernetes/pkg/util"
|
||||
"k8s.io/kubernetes/pkg/watch"
|
||||
|
||||
|
@ -119,7 +118,7 @@ func newEtcdWatcher(list bool, include includeFunc, filter storage.FilterFunc, e
|
|||
|
||||
// etcdWatch calls etcd's Watch function, and handles any errors. Meant to be called
|
||||
// as a goroutine.
|
||||
func (w *etcdWatcher) etcdWatch(client tools.EtcdClient, key string, resourceVersion uint64) {
|
||||
func (w *etcdWatcher) etcdWatch(client *etcd.Client, key string, resourceVersion uint64) {
|
||||
defer util.HandleCrash()
|
||||
defer close(w.etcdError)
|
||||
if resourceVersion == 0 {
|
||||
|
@ -137,7 +136,7 @@ func (w *etcdWatcher) etcdWatch(client tools.EtcdClient, key string, resourceVer
|
|||
}
|
||||
|
||||
// etcdGetInitialWatchState turns an etcd Get request into a watch equivalent
|
||||
func etcdGetInitialWatchState(client tools.EtcdClient, key string, recursive bool, incoming chan<- *etcd.Response) (resourceVersion uint64, err error) {
|
||||
func etcdGetInitialWatchState(client *etcd.Client, key string, recursive bool, incoming chan<- *etcd.Response) (resourceVersion uint64, err error) {
|
||||
resp, err := client.Get(key, false, recursive)
|
||||
if err != nil {
|
||||
if !etcdutil.IsEtcdNotFound(err) {
|
||||
|
|
|
@ -26,8 +26,6 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"k8s.io/kubernetes/pkg/tools"
|
||||
|
||||
"github.com/coreos/etcd/etcdserver"
|
||||
"github.com/coreos/etcd/etcdserver/etcdhttp"
|
||||
"github.com/coreos/etcd/pkg/transport"
|
||||
|
@ -40,7 +38,7 @@ import (
|
|||
type EtcdTestServer struct {
|
||||
etcdserver.ServerConfig
|
||||
PeerListeners, ClientListeners []net.Listener
|
||||
Client tools.EtcdClient
|
||||
Client *goetcd.Client
|
||||
|
||||
raftHandler http.Handler
|
||||
s *etcdserver.EtcdServer
|
||||
|
@ -128,7 +126,7 @@ func (m *EtcdTestServer) launch(t *testing.T) error {
|
|||
|
||||
// Terminate will shutdown the running etcd server
|
||||
func (m *EtcdTestServer) Terminate(t *testing.T) {
|
||||
m.Client.(*goetcd.Client).Close()
|
||||
m.Client.Close()
|
||||
m.s.Stop()
|
||||
for _, hs := range m.hss {
|
||||
hs.CloseClientConnections()
|
||||
|
|
|
@ -149,3 +149,13 @@ type Interface interface {
|
|||
// Codec provides access to the underlying codec being used by the implementation.
|
||||
Codec() runtime.Codec
|
||||
}
|
||||
|
||||
// Config interface allows storage tiers to generate the proper storage.interface
|
||||
// and reduce the dependencies to encapsulate storage.
|
||||
type Config interface {
|
||||
// Creates the Interface base on ConfigObject
|
||||
NewStorage() (Interface, error)
|
||||
|
||||
// This function is used to enforce membership, and return the underlying type
|
||||
GetType() string
|
||||
}
|
||||
|
|
|
@ -1,19 +0,0 @@
|
|||
/*
|
||||
Copyright 2014 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
// Package tools implements types which help work with etcd which depend on the api package.
|
||||
// TODO: move this package to an etcd specific utility package.
|
||||
package tools
|
|
@ -1,34 +0,0 @@
|
|||
/*
|
||||
Copyright 2014 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package tools
|
||||
|
||||
import (
|
||||
"github.com/coreos/go-etcd/etcd"
|
||||
)
|
||||
|
||||
// EtcdClient is an injectable interface for testing.
|
||||
type EtcdClient interface {
|
||||
GetCluster() []string
|
||||
Get(key string, sort, recursive bool) (*etcd.Response, error)
|
||||
Set(key, value string, ttl uint64) (*etcd.Response, error)
|
||||
Create(key, value string, ttl uint64) (*etcd.Response, error)
|
||||
CompareAndSwap(key, value string, ttl uint64, prevValue string, prevIndex uint64) (*etcd.Response, error)
|
||||
Delete(key string, recursive bool) (*etcd.Response, error)
|
||||
// I'd like to use directional channels here (e.g. <-chan) but this interface mimics
|
||||
// the etcd client interface which doesn't, and it doesn't seem worth it to wrap the api.
|
||||
Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error)
|
||||
}
|
Loading…
Reference in New Issue