mirror of https://github.com/k3s-io/k3s
commit
21f750fdc3
|
@ -26,20 +26,18 @@ import (
|
|||
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/endpoint"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
|
||||
|
||||
"github.com/golang/glog"
|
||||
)
|
||||
|
||||
const (
|
||||
// ControllerPath is the path to controller resources in etcd
|
||||
ControllerPath string = "/controllers"
|
||||
// ServicePath is the path to service resources in etcd
|
||||
ServicePath string = "/services/specs"
|
||||
)
|
||||
|
||||
// TODO(wojtek-t): Change it to use rest.StandardStorage (as everything else)
|
||||
// and move it to service/ directory.
|
||||
|
||||
// TODO: Need to add a reconciler loop that makes sure that things in pods are reflected into
|
||||
// kubelet (and vice versa)
|
||||
|
||||
|
@ -85,105 +83,6 @@ func MakeEtcdItemKey(ctx api.Context, prefix string, id string) (string, error)
|
|||
return key, nil
|
||||
}
|
||||
|
||||
// ListControllers obtains a list of ReplicationControllers.
|
||||
func (r *Registry) ListControllers(ctx api.Context) (*api.ReplicationControllerList, error) {
|
||||
controllers := &api.ReplicationControllerList{}
|
||||
key := makeControllerListKey(ctx)
|
||||
err := r.ExtractToList(key, controllers)
|
||||
return controllers, err
|
||||
}
|
||||
|
||||
// WatchControllers begins watching for new, changed, or deleted controllers.
|
||||
func (r *Registry) WatchControllers(ctx api.Context, label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) {
|
||||
if !field.Empty() {
|
||||
return nil, fmt.Errorf("field selectors are not supported on replication controllers")
|
||||
}
|
||||
version, err := tools.ParseWatchResourceVersion(resourceVersion, "replicationControllers")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
key := makeControllerListKey(ctx)
|
||||
return r.WatchList(key, version, func(obj runtime.Object) bool {
|
||||
controller, ok := obj.(*api.ReplicationController)
|
||||
if !ok {
|
||||
// Must be an error: return true to propagate to upper level.
|
||||
return true
|
||||
}
|
||||
match := label.Matches(labels.Set(controller.Labels))
|
||||
if match {
|
||||
pods, err := r.pods.ListPods(ctx, labels.Set(controller.Spec.Selector).AsSelector())
|
||||
if err != nil {
|
||||
glog.Warningf("Error listing pods: %v", err)
|
||||
// No object that's useable so drop it on the floor
|
||||
return false
|
||||
}
|
||||
if pods == nil {
|
||||
glog.Warningf("Pods list is nil. This should never happen...")
|
||||
// No object that's useable so drop it on the floor
|
||||
return false
|
||||
}
|
||||
controller.Status.Replicas = len(pods.Items)
|
||||
}
|
||||
return match
|
||||
})
|
||||
}
|
||||
|
||||
// makeControllerListKey constructs etcd paths to controller directories enforcing namespace rules.
|
||||
func makeControllerListKey(ctx api.Context) string {
|
||||
return MakeEtcdListKey(ctx, ControllerPath)
|
||||
}
|
||||
|
||||
// makeControllerKey constructs etcd paths to controller items enforcing namespace rules.
|
||||
func makeControllerKey(ctx api.Context, id string) (string, error) {
|
||||
return MakeEtcdItemKey(ctx, ControllerPath, id)
|
||||
}
|
||||
|
||||
// GetController gets a specific ReplicationController specified by its ID.
|
||||
func (r *Registry) GetController(ctx api.Context, controllerID string) (*api.ReplicationController, error) {
|
||||
var controller api.ReplicationController
|
||||
key, err := makeControllerKey(ctx, controllerID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
err = r.ExtractObj(key, &controller, false)
|
||||
if err != nil {
|
||||
return nil, etcderr.InterpretGetError(err, "replicationController", controllerID)
|
||||
}
|
||||
return &controller, nil
|
||||
}
|
||||
|
||||
// CreateController creates a new ReplicationController.
|
||||
func (r *Registry) CreateController(ctx api.Context, controller *api.ReplicationController) (*api.ReplicationController, error) {
|
||||
key, err := makeControllerKey(ctx, controller.Name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
out := &api.ReplicationController{}
|
||||
err = r.CreateObj(key, controller, out, 0)
|
||||
return out, etcderr.InterpretCreateError(err, "replicationController", controller.Name)
|
||||
}
|
||||
|
||||
// UpdateController replaces an existing ReplicationController.
|
||||
func (r *Registry) UpdateController(ctx api.Context, controller *api.ReplicationController) (*api.ReplicationController, error) {
|
||||
key, err := makeControllerKey(ctx, controller.Name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
out := &api.ReplicationController{}
|
||||
err = r.SetObj(key, controller, out, 0)
|
||||
return out, etcderr.InterpretUpdateError(err, "replicationController", controller.Name)
|
||||
}
|
||||
|
||||
// DeleteController deletes a ReplicationController specified by its ID.
|
||||
func (r *Registry) DeleteController(ctx api.Context, controllerID string) error {
|
||||
key, err := makeControllerKey(ctx, controllerID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = r.Delete(key, false)
|
||||
return etcderr.InterpretDeleteError(err, "replicationController", controllerID)
|
||||
}
|
||||
|
||||
// makePodListKey constructs etcd paths to service directories enforcing namespace rules.
|
||||
func makeServiceListKey(ctx api.Context) string {
|
||||
return MakeEtcdListKey(ctx, ServicePath)
|
||||
|
|
|
@ -19,7 +19,6 @@ package etcd
|
|||
import (
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
|
||||
|
@ -52,26 +51,6 @@ func NewTestEtcdRegistryWithPods(client tools.EtcdClient) *Registry {
|
|||
return registry
|
||||
}
|
||||
|
||||
func TestEtcdListControllersNotFound(t *testing.T) {
|
||||
fakeClient := tools.NewFakeEtcdClient(t)
|
||||
ctx := api.NewDefaultContext()
|
||||
key := makeControllerListKey(ctx)
|
||||
key = etcdtest.AddPrefix(key)
|
||||
fakeClient.Data[key] = tools.EtcdResponseWithError{
|
||||
R: &etcd.Response{},
|
||||
E: tools.EtcdErrorNotFound,
|
||||
}
|
||||
registry := NewTestEtcdRegistry(fakeClient)
|
||||
controllers, err := registry.ListControllers(ctx)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
if len(controllers.Items) != 0 {
|
||||
t.Errorf("Unexpected controller list: %#v", controllers)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEtcdListServicesNotFound(t *testing.T) {
|
||||
fakeClient := tools.NewFakeEtcdClient(t)
|
||||
registry := NewTestEtcdRegistry(fakeClient)
|
||||
|
@ -88,317 +67,7 @@ func TestEtcdListServicesNotFound(t *testing.T) {
|
|||
}
|
||||
|
||||
if len(services.Items) != 0 {
|
||||
t.Errorf("Unexpected controller list: %#v", services)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEtcdListControllers(t *testing.T) {
|
||||
fakeClient := tools.NewFakeEtcdClient(t)
|
||||
registry := NewTestEtcdRegistry(fakeClient)
|
||||
ctx := api.NewDefaultContext()
|
||||
key := makeControllerListKey(ctx)
|
||||
key = etcdtest.AddPrefix(key)
|
||||
fakeClient.Data[key] = tools.EtcdResponseWithError{
|
||||
R: &etcd.Response{
|
||||
Node: &etcd.Node{
|
||||
Nodes: []*etcd.Node{
|
||||
{
|
||||
Value: runtime.EncodeOrDie(latest.Codec, &api.ReplicationController{ObjectMeta: api.ObjectMeta{Name: "foo"}}),
|
||||
},
|
||||
{
|
||||
Value: runtime.EncodeOrDie(latest.Codec, &api.ReplicationController{ObjectMeta: api.ObjectMeta{Name: "bar"}}),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
E: nil,
|
||||
}
|
||||
controllers, err := registry.ListControllers(ctx)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
if len(controllers.Items) != 2 || controllers.Items[0].Name != "foo" || controllers.Items[1].Name != "bar" {
|
||||
t.Errorf("Unexpected controller list: %#v", controllers)
|
||||
}
|
||||
}
|
||||
|
||||
// TestEtcdGetControllerDifferentNamespace ensures same-name controllers in different namespaces do not clash
|
||||
func TestEtcdGetControllerDifferentNamespace(t *testing.T) {
|
||||
fakeClient := tools.NewFakeEtcdClient(t)
|
||||
registry := NewTestEtcdRegistry(fakeClient)
|
||||
|
||||
ctx1 := api.NewDefaultContext()
|
||||
ctx2 := api.WithNamespace(api.NewContext(), "other")
|
||||
|
||||
key1, _ := makeControllerKey(ctx1, "foo")
|
||||
key2, _ := makeControllerKey(ctx2, "foo")
|
||||
|
||||
key1 = etcdtest.AddPrefix(key1)
|
||||
key2 = etcdtest.AddPrefix(key2)
|
||||
|
||||
fakeClient.Set(key1, runtime.EncodeOrDie(latest.Codec, &api.ReplicationController{ObjectMeta: api.ObjectMeta{Namespace: "default", Name: "foo"}}), 0)
|
||||
fakeClient.Set(key2, runtime.EncodeOrDie(latest.Codec, &api.ReplicationController{ObjectMeta: api.ObjectMeta{Namespace: "other", Name: "foo"}}), 0)
|
||||
|
||||
ctrl1, err := registry.GetController(ctx1, "foo")
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
if ctrl1.Name != "foo" {
|
||||
t.Errorf("Unexpected controller: %#v", ctrl1)
|
||||
}
|
||||
if ctrl1.Namespace != "default" {
|
||||
t.Errorf("Unexpected controller: %#v", ctrl1)
|
||||
}
|
||||
|
||||
ctrl2, err := registry.GetController(ctx2, "foo")
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
if ctrl2.Name != "foo" {
|
||||
t.Errorf("Unexpected controller: %#v", ctrl2)
|
||||
}
|
||||
if ctrl2.Namespace != "other" {
|
||||
t.Errorf("Unexpected controller: %#v", ctrl2)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEtcdGetController(t *testing.T) {
|
||||
ctx := api.NewDefaultContext()
|
||||
fakeClient := tools.NewFakeEtcdClient(t)
|
||||
registry := NewTestEtcdRegistry(fakeClient)
|
||||
key, _ := makeControllerKey(ctx, "foo")
|
||||
key = etcdtest.AddPrefix(key)
|
||||
fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, &api.ReplicationController{ObjectMeta: api.ObjectMeta{Name: "foo"}}), 0)
|
||||
ctrl, err := registry.GetController(ctx, "foo")
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
if ctrl.Name != "foo" {
|
||||
t.Errorf("Unexpected controller: %#v", ctrl)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEtcdGetControllerNotFound(t *testing.T) {
|
||||
ctx := api.NewDefaultContext()
|
||||
fakeClient := tools.NewFakeEtcdClient(t)
|
||||
registry := NewTestEtcdRegistry(fakeClient)
|
||||
key, _ := makeControllerKey(ctx, "foo")
|
||||
key = etcdtest.AddPrefix(key)
|
||||
fakeClient.Data[key] = tools.EtcdResponseWithError{
|
||||
R: &etcd.Response{
|
||||
Node: nil,
|
||||
},
|
||||
E: tools.EtcdErrorNotFound,
|
||||
}
|
||||
ctrl, err := registry.GetController(ctx, "foo")
|
||||
if ctrl != nil {
|
||||
t.Errorf("Unexpected non-nil controller: %#v", ctrl)
|
||||
}
|
||||
if !errors.IsNotFound(err) {
|
||||
t.Errorf("Unexpected error returned: %#v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEtcdDeleteController(t *testing.T) {
|
||||
ctx := api.NewDefaultContext()
|
||||
fakeClient := tools.NewFakeEtcdClient(t)
|
||||
registry := NewTestEtcdRegistry(fakeClient)
|
||||
key, _ := makeControllerKey(ctx, "foo")
|
||||
key = etcdtest.AddPrefix(key)
|
||||
fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, &api.ReplicationController{ObjectMeta: api.ObjectMeta{Name: "foo"}}), 0)
|
||||
err := registry.DeleteController(ctx, "foo")
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
if len(fakeClient.DeletedKeys) != 1 {
|
||||
t.Errorf("Expected 1 delete, found %#v", fakeClient.DeletedKeys)
|
||||
}
|
||||
if fakeClient.DeletedKeys[0] != key {
|
||||
t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[0], key)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEtcdCreateController(t *testing.T) {
|
||||
ctx := api.NewDefaultContext()
|
||||
fakeClient := tools.NewFakeEtcdClient(t)
|
||||
registry := NewTestEtcdRegistry(fakeClient)
|
||||
key, _ := makeControllerKey(ctx, "foo")
|
||||
key = etcdtest.AddPrefix(key)
|
||||
_, err := registry.CreateController(ctx, &api.ReplicationController{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: "foo",
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
resp, err := fakeClient.Get(key, false, false)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error %v", err)
|
||||
}
|
||||
var ctrl api.ReplicationController
|
||||
err = latest.Codec.DecodeInto([]byte(resp.Node.Value), &ctrl)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
if ctrl.Name != "foo" {
|
||||
t.Errorf("Unexpected pod: %#v %s", ctrl, resp.Node.Value)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEtcdCreateControllerAlreadyExisting(t *testing.T) {
|
||||
ctx := api.NewDefaultContext()
|
||||
fakeClient := tools.NewFakeEtcdClient(t)
|
||||
registry := NewTestEtcdRegistry(fakeClient)
|
||||
key, _ := makeControllerKey(ctx, "foo")
|
||||
key = etcdtest.AddPrefix(key)
|
||||
fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, &api.ReplicationController{ObjectMeta: api.ObjectMeta{Name: "foo"}}), 0)
|
||||
|
||||
_, err := registry.CreateController(ctx, &api.ReplicationController{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: "foo",
|
||||
},
|
||||
})
|
||||
if !errors.IsAlreadyExists(err) {
|
||||
t.Errorf("expected already exists err, got %#v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEtcdUpdateController(t *testing.T) {
|
||||
ctx := api.NewDefaultContext()
|
||||
fakeClient := tools.NewFakeEtcdClient(t)
|
||||
registry := NewTestEtcdRegistry(fakeClient)
|
||||
fakeClient.TestIndex = true
|
||||
key, _ := makeControllerKey(ctx, "foo")
|
||||
key = etcdtest.AddPrefix(key)
|
||||
resp, _ := fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, &api.ReplicationController{ObjectMeta: api.ObjectMeta{Name: "foo"}}), 0)
|
||||
_, err := registry.UpdateController(ctx, &api.ReplicationController{
|
||||
ObjectMeta: api.ObjectMeta{Name: "foo", ResourceVersion: strconv.FormatUint(resp.Node.ModifiedIndex, 10)},
|
||||
Spec: api.ReplicationControllerSpec{
|
||||
Replicas: 2,
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
ctrl, err := registry.GetController(ctx, "foo")
|
||||
if ctrl.Spec.Replicas != 2 {
|
||||
t.Errorf("Unexpected controller: %#v", ctrl)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEtcdWatchController(t *testing.T) {
|
||||
ctx := api.NewDefaultContext()
|
||||
fakeClient := tools.NewFakeEtcdClient(t)
|
||||
registry := NewTestEtcdRegistry(fakeClient)
|
||||
watching, err := registry.WatchControllers(ctx,
|
||||
labels.Everything(),
|
||||
fields.Everything(),
|
||||
"1",
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
fakeClient.WaitForWatchCompletion()
|
||||
|
||||
select {
|
||||
case _, ok := <-watching.ResultChan():
|
||||
if !ok {
|
||||
t.Errorf("watching channel should be open")
|
||||
}
|
||||
default:
|
||||
}
|
||||
fakeClient.WatchInjectError <- nil
|
||||
if _, ok := <-watching.ResultChan(); ok {
|
||||
t.Errorf("watching channel should be closed")
|
||||
}
|
||||
watching.Stop()
|
||||
}
|
||||
|
||||
func TestEtcdWatchControllersMatch(t *testing.T) {
|
||||
ctx := api.NewDefaultContext()
|
||||
fakeClient := tools.NewFakeEtcdClient(t)
|
||||
registry := NewTestEtcdRegistryWithPods(fakeClient)
|
||||
path := etcdgeneric.NamespaceKeyRootFunc(ctx, "/pods")
|
||||
path = etcdtest.AddPrefix(path)
|
||||
fakeClient.ExpectNotFoundGet(path)
|
||||
watching, err := registry.WatchControllers(ctx,
|
||||
labels.SelectorFromSet(labels.Set{"name": "foo"}),
|
||||
fields.Everything(),
|
||||
"1",
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
fakeClient.WaitForWatchCompletion()
|
||||
|
||||
controller := &api.ReplicationController{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: "foo",
|
||||
Labels: map[string]string{
|
||||
"name": "foo",
|
||||
},
|
||||
},
|
||||
}
|
||||
controllerBytes, _ := latest.Codec.Encode(controller)
|
||||
fakeClient.WatchResponse <- &etcd.Response{
|
||||
Action: "create",
|
||||
Node: &etcd.Node{
|
||||
Value: string(controllerBytes),
|
||||
},
|
||||
}
|
||||
select {
|
||||
case _, ok := <-watching.ResultChan():
|
||||
if !ok {
|
||||
t.Errorf("watching channel should be open")
|
||||
}
|
||||
case <-time.After(time.Millisecond * 100):
|
||||
t.Error("unexpected timeout from result channel")
|
||||
}
|
||||
watching.Stop()
|
||||
}
|
||||
|
||||
func TestEtcdWatchControllersNotMatch(t *testing.T) {
|
||||
ctx := api.NewDefaultContext()
|
||||
fakeClient := tools.NewFakeEtcdClient(t)
|
||||
fakeClient.ExpectNotFoundGet(etcdgeneric.NamespaceKeyRootFunc(ctx, "/registry/pods"))
|
||||
registry := NewTestEtcdRegistryWithPods(fakeClient)
|
||||
watching, err := registry.WatchControllers(ctx,
|
||||
labels.SelectorFromSet(labels.Set{"name": "foo"}),
|
||||
fields.Everything(),
|
||||
"1",
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
fakeClient.WaitForWatchCompletion()
|
||||
|
||||
controller := &api.ReplicationController{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: "bar",
|
||||
Labels: map[string]string{
|
||||
"name": "bar",
|
||||
},
|
||||
},
|
||||
}
|
||||
controllerBytes, _ := latest.Codec.Encode(controller)
|
||||
fakeClient.WatchResponse <- &etcd.Response{
|
||||
Action: "create",
|
||||
Node: &etcd.Node{
|
||||
Value: string(controllerBytes),
|
||||
},
|
||||
}
|
||||
|
||||
select {
|
||||
case <-watching.ResultChan():
|
||||
t.Error("unexpected result from result channel")
|
||||
case <-time.After(time.Millisecond * 100):
|
||||
// expected case
|
||||
t.Errorf("Unexpected services list: %#v", services)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -203,9 +203,6 @@ func (r *StatusREST) Update(ctx api.Context, obj runtime.Object) (runtime.Object
|
|||
return r.store.Update(ctx, obj)
|
||||
}
|
||||
|
||||
// Implement GetterWithOptions
|
||||
var _ = rest.GetterWithOptions(&LogREST{})
|
||||
|
||||
// LogREST implements the log endpoint for a Pod
|
||||
type LogREST struct {
|
||||
store *etcdgeneric.Etcd
|
||||
|
|
Loading…
Reference in New Issue