mirror of https://github.com/k3s-io/k3s
Merge pull request #4751 from bprashanth/sync_replica_count
Replication Controller syncs current replica count with api server #4429pull/6/head
commit
cd8a979206
|
@ -203,9 +203,8 @@ func startComponents(manifestURL string) (apiServerURL string) {
|
|||
|
||||
controllerManager := replicationControllerPkg.NewReplicationManager(cl)
|
||||
|
||||
// Prove that controllerManager's watch works by making it not sync until after this
|
||||
// test is over. (Hopefully we don't take 10 minutes!)
|
||||
controllerManager.Run(10 * time.Minute)
|
||||
// TODO: Write an integration test for the replication controllers watch.
|
||||
controllerManager.Run(1 * time.Second)
|
||||
|
||||
nodeResources := &api.NodeResources{}
|
||||
|
||||
|
|
|
@ -196,7 +196,8 @@ func (rm *ReplicationManager) syncReplicationController(controller api.Replicati
|
|||
return err
|
||||
}
|
||||
filteredList := FilterActivePods(podList.Items)
|
||||
diff := len(filteredList) - controller.Spec.Replicas
|
||||
activePods := len(filteredList)
|
||||
diff := activePods - controller.Spec.Replicas
|
||||
if diff < 0 {
|
||||
diff *= -1
|
||||
wait := sync.WaitGroup{}
|
||||
|
@ -221,6 +222,13 @@ func (rm *ReplicationManager) syncReplicationController(controller api.Replicati
|
|||
}
|
||||
wait.Wait()
|
||||
}
|
||||
if controller.Status.Replicas != activePods {
|
||||
controller.Status.Replicas = activePods
|
||||
_, err = rm.kubeClient.ReplicationControllers(controller.Namespace).Update(&controller)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -68,6 +68,8 @@ func (f *FakePodControl) deletePod(namespace string, podName string) error {
|
|||
|
||||
func newReplicationController(replicas int) api.ReplicationController {
|
||||
return api.ReplicationController{
|
||||
TypeMeta: api.TypeMeta{APIVersion: testapi.Version()},
|
||||
ObjectMeta: api.ObjectMeta{Name: "foobar", Namespace: "default", ResourceVersion: "18"},
|
||||
Spec: api.ReplicationControllerSpec{
|
||||
Replicas: replicas,
|
||||
Template: &api.PodTemplateSpec{
|
||||
|
@ -81,8 +83,14 @@ func newReplicationController(replicas int) api.ReplicationController {
|
|||
Containers: []api.Container{
|
||||
{
|
||||
Image: "foo/bar",
|
||||
TerminationMessagePath: api.TerminationMessagePathDefault,
|
||||
ImagePullPolicy: api.PullIfNotPresent,
|
||||
},
|
||||
},
|
||||
RestartPolicy: api.RestartPolicy{
|
||||
Always: &api.RestartPolicyAlways{},
|
||||
},
|
||||
DNSPolicy: api.DNSDefault,
|
||||
NodeSelector: map[string]string{
|
||||
"baz": "blah",
|
||||
},
|
||||
|
@ -159,23 +167,37 @@ func TestSyncReplicationControllerDeletes(t *testing.T) {
|
|||
|
||||
func TestSyncReplicationControllerCreates(t *testing.T) {
|
||||
body := runtime.EncodeOrDie(testapi.Codec(), newPodList(0))
|
||||
fakeHandler := util.FakeHandler{
|
||||
fakePodHandler := util.FakeHandler{
|
||||
StatusCode: 200,
|
||||
ResponseBody: string(body),
|
||||
}
|
||||
testServer := httptest.NewServer(&fakeHandler)
|
||||
fakePodControl := FakePodControl{}
|
||||
|
||||
controller := newReplicationController(2)
|
||||
fakeUpdateHandler := util.FakeHandler{
|
||||
StatusCode: 200,
|
||||
ResponseBody: runtime.EncodeOrDie(testapi.Codec(), &controller),
|
||||
T: t,
|
||||
}
|
||||
|
||||
testServerMux := http.NewServeMux()
|
||||
testServerMux.Handle("/api/"+testapi.Version()+"/pods/", &fakePodHandler)
|
||||
testServerMux.Handle(fmt.Sprintf("/api/"+testapi.Version()+"/replicationControllers/%s", controller.Name), &fakeUpdateHandler)
|
||||
testServer := httptest.NewServer(testServerMux)
|
||||
defer testServer.Close()
|
||||
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()})
|
||||
|
||||
fakePodControl := FakePodControl{}
|
||||
|
||||
manager := NewReplicationManager(client)
|
||||
manager.podControl = &fakePodControl
|
||||
|
||||
controllerSpec := newReplicationController(2)
|
||||
|
||||
manager.syncReplicationController(controllerSpec)
|
||||
manager.syncReplicationController(controller)
|
||||
validateSyncReplication(t, &fakePodControl, 2, 0)
|
||||
|
||||
// No Status.Replicas update expected even though 2 pods were just created,
|
||||
// because the controller manager can't observe the pods till the next sync cycle.
|
||||
if fakeUpdateHandler.RequestReceived != nil {
|
||||
t.Errorf("Unexpected updates for controller via %v",
|
||||
fakeUpdateHandler.RequestReceived.URL)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCreateReplica(t *testing.T) {
|
||||
|
@ -193,33 +215,7 @@ func TestCreateReplica(t *testing.T) {
|
|||
kubeClient: client,
|
||||
}
|
||||
|
||||
controllerSpec := api.ReplicationController{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: "test",
|
||||
},
|
||||
Spec: api.ReplicationControllerSpec{
|
||||
Template: &api.PodTemplateSpec{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Labels: map[string]string{
|
||||
"name": "foo",
|
||||
"type": "production",
|
||||
"replicationController": "test",
|
||||
},
|
||||
},
|
||||
Spec: api.PodSpec{
|
||||
Containers: []api.Container{
|
||||
{
|
||||
Image: "foo/bar",
|
||||
},
|
||||
},
|
||||
NodeSelector: map[string]string{
|
||||
"foo": "bar",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
controllerSpec := newReplicationController(1)
|
||||
podControl.createReplica(ns, controllerSpec)
|
||||
|
||||
manifest := api.ContainerManifest{}
|
||||
|
@ -245,48 +241,13 @@ func TestCreateReplica(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestSynchonize(t *testing.T) {
|
||||
controllerSpec1 := api.ReplicationController{
|
||||
TypeMeta: api.TypeMeta{APIVersion: testapi.Version()},
|
||||
Spec: api.ReplicationControllerSpec{
|
||||
Replicas: 4,
|
||||
Template: &api.PodTemplateSpec{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Labels: map[string]string{
|
||||
"name": "foo",
|
||||
"type": "production",
|
||||
},
|
||||
},
|
||||
Spec: api.PodSpec{
|
||||
Containers: []api.Container{
|
||||
{
|
||||
Image: "foo/bar",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
controllerSpec2 := api.ReplicationController{
|
||||
TypeMeta: api.TypeMeta{APIVersion: testapi.Version()},
|
||||
Spec: api.ReplicationControllerSpec{
|
||||
Replicas: 3,
|
||||
Template: &api.PodTemplateSpec{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Labels: map[string]string{
|
||||
"name": "bar",
|
||||
"type": "production",
|
||||
},
|
||||
},
|
||||
Spec: api.PodSpec{
|
||||
Containers: []api.Container{
|
||||
{
|
||||
Image: "bar/baz",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
func TestSynchronize(t *testing.T) {
|
||||
controllerSpec1 := newReplicationController(4)
|
||||
controllerSpec2 := newReplicationController(3)
|
||||
controllerSpec2.Name = "bar"
|
||||
controllerSpec2.Spec.Template.ObjectMeta.Labels = map[string]string{
|
||||
"name": "bar",
|
||||
"type": "production",
|
||||
}
|
||||
|
||||
fakeEtcd := tools.NewFakeEtcdClient(t)
|
||||
|
@ -339,6 +300,106 @@ func TestSynchonize(t *testing.T) {
|
|||
validateSyncReplication(t, &fakePodControl, 7, 0)
|
||||
}
|
||||
|
||||
func TestControllerNoReplicaUpdate(t *testing.T) {
|
||||
// Steady state for the replication controller, no Status.Replicas updates expected
|
||||
rc := newReplicationController(5)
|
||||
rc.Status = api.ReplicationControllerStatus{Replicas: 5}
|
||||
activePods := 5
|
||||
|
||||
body, _ := latest.Codec.Encode(newPodList(activePods))
|
||||
fakePodHandler := util.FakeHandler{
|
||||
StatusCode: 200,
|
||||
ResponseBody: string(body),
|
||||
T: t,
|
||||
}
|
||||
fakeControllerHandler := util.FakeHandler{
|
||||
StatusCode: 200,
|
||||
ResponseBody: runtime.EncodeOrDie(latest.Codec, &api.ReplicationControllerList{
|
||||
Items: []api.ReplicationController{rc},
|
||||
}),
|
||||
T: t,
|
||||
}
|
||||
fakeUpdateHandler := util.FakeHandler{
|
||||
StatusCode: 200,
|
||||
ResponseBody: runtime.EncodeOrDie(testapi.Codec(), &rc),
|
||||
T: t,
|
||||
}
|
||||
|
||||
mux := http.NewServeMux()
|
||||
mux.Handle("/api/"+testapi.Version()+"/pods/", &fakePodHandler)
|
||||
mux.Handle("/api/"+testapi.Version()+"/replicationControllers/", &fakeControllerHandler)
|
||||
mux.Handle(fmt.Sprintf("/api/"+testapi.Version()+"/replicationControllers/%s", rc.Name), &fakeUpdateHandler)
|
||||
mux.HandleFunc("/", func(w http.ResponseWriter, req *http.Request) {
|
||||
w.WriteHeader(http.StatusNotFound)
|
||||
t.Errorf("Unexpected request for %v", req.RequestURI)
|
||||
})
|
||||
testServer := httptest.NewServer(mux)
|
||||
defer testServer.Close()
|
||||
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()})
|
||||
manager := NewReplicationManager(client)
|
||||
fakePodControl := FakePodControl{}
|
||||
manager.podControl = &fakePodControl
|
||||
|
||||
manager.synchronize()
|
||||
|
||||
validateSyncReplication(t, &fakePodControl, 0, 0)
|
||||
if fakeUpdateHandler.RequestReceived != nil {
|
||||
t.Errorf("Unexpected updates for controller via %v",
|
||||
fakeUpdateHandler.RequestReceived.URL)
|
||||
}
|
||||
}
|
||||
|
||||
func TestControllerUpdateReplicas(t *testing.T) {
|
||||
// Insufficient number of pods in the system, and Status.Replicas is wrong;
|
||||
// Status.Replica should update to match number of pods in system, 1 new pod should be created.
|
||||
rc := newReplicationController(5)
|
||||
rc.Status = api.ReplicationControllerStatus{Replicas: 2}
|
||||
activePods := 4
|
||||
|
||||
body, _ := latest.Codec.Encode(newPodList(activePods))
|
||||
fakePodHandler := util.FakeHandler{
|
||||
StatusCode: 200,
|
||||
ResponseBody: string(body),
|
||||
T: t,
|
||||
}
|
||||
fakeControllerHandler := util.FakeHandler{
|
||||
StatusCode: 200,
|
||||
ResponseBody: runtime.EncodeOrDie(latest.Codec, &api.ReplicationControllerList{
|
||||
Items: []api.ReplicationController{rc},
|
||||
}),
|
||||
T: t,
|
||||
}
|
||||
fakeUpdateHandler := util.FakeHandler{
|
||||
StatusCode: 200,
|
||||
ResponseBody: runtime.EncodeOrDie(testapi.Codec(), &rc),
|
||||
T: t,
|
||||
}
|
||||
|
||||
mux := http.NewServeMux()
|
||||
|
||||
mux.Handle("/api/"+testapi.Version()+"/pods/", &fakePodHandler)
|
||||
mux.Handle("/api/"+testapi.Version()+"/replicationControllers/", &fakeControllerHandler)
|
||||
mux.Handle(fmt.Sprintf("/api/"+testapi.Version()+"/replicationControllers/%s", rc.Name), &fakeUpdateHandler)
|
||||
mux.HandleFunc("/", func(w http.ResponseWriter, req *http.Request) {
|
||||
w.WriteHeader(http.StatusNotFound)
|
||||
t.Errorf("Unexpected request for %v", req.RequestURI)
|
||||
})
|
||||
testServer := httptest.NewServer(mux)
|
||||
defer testServer.Close()
|
||||
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()})
|
||||
manager := NewReplicationManager(client)
|
||||
fakePodControl := FakePodControl{}
|
||||
manager.podControl = &fakePodControl
|
||||
|
||||
manager.synchronize()
|
||||
|
||||
// Status.Replicas should go up from 2->4 even though we created 5-4=1 pod
|
||||
rc.Status = api.ReplicationControllerStatus{Replicas: 4}
|
||||
decRc := runtime.EncodeOrDie(testapi.Codec(), &rc)
|
||||
fakeUpdateHandler.ValidateRequest(t, fmt.Sprintf("/api/"+testapi.Version()+"/replicationControllers/%s?namespace=%s", rc.Name, rc.Namespace), "PUT", &decRc)
|
||||
validateSyncReplication(t, &fakePodControl, 1, 0)
|
||||
}
|
||||
|
||||
type FakeWatcher struct {
|
||||
w *watch.FakeWatcher
|
||||
*client.Fake
|
||||
|
|
|
@ -23,7 +23,6 @@ import (
|
|||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation"
|
||||
rc "github.com/GoogleCloudPlatform/kubernetes/pkg/controller"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||
|
@ -78,7 +77,6 @@ func (rs *REST) Get(ctx api.Context, id string) (runtime.Object, error) {
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rs.fillCurrentState(ctx, controller)
|
||||
return controller, err
|
||||
}
|
||||
|
||||
|
@ -94,7 +92,6 @@ func (rs *REST) List(ctx api.Context, label labels.Selector, field fields.Select
|
|||
filtered := []api.ReplicationController{}
|
||||
for _, controller := range controllers.Items {
|
||||
if label.Matches(labels.Set(controller.Labels)) {
|
||||
rs.fillCurrentState(ctx, &controller)
|
||||
filtered = append(filtered, controller)
|
||||
}
|
||||
}
|
||||
|
@ -133,16 +130,3 @@ func (rs *REST) Update(ctx api.Context, obj runtime.Object) (runtime.Object, boo
|
|||
func (rs *REST) Watch(ctx api.Context, label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) {
|
||||
return rs.registry.WatchControllers(ctx, label, field, resourceVersion)
|
||||
}
|
||||
|
||||
// TODO #2726: The controller should populate the current state, not the apiserver
|
||||
func (rs *REST) fillCurrentState(ctx api.Context, controller *api.ReplicationController) error {
|
||||
if rs.podLister == nil {
|
||||
return nil
|
||||
}
|
||||
list, err := rs.podLister.ListPods(ctx, labels.Set(controller.Spec.Selector).AsSelector())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
controller.Status.Replicas = len(rc.FilterActivePods(list.Items))
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -20,7 +20,6 @@ import (
|
|||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"reflect"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
|
@ -354,37 +353,6 @@ func (f *fakePodLister) ListPods(ctx api.Context, s labels.Selector) (*api.PodLi
|
|||
return &f.l, f.e
|
||||
}
|
||||
|
||||
func TestFillCurrentState(t *testing.T) {
|
||||
fakeLister := fakePodLister{
|
||||
l: api.PodList{
|
||||
Items: []api.Pod{
|
||||
{ObjectMeta: api.ObjectMeta{Name: "foo"}},
|
||||
{ObjectMeta: api.ObjectMeta{Name: "bar"}},
|
||||
},
|
||||
},
|
||||
}
|
||||
mockRegistry := registrytest.ControllerRegistry{}
|
||||
storage := REST{
|
||||
registry: &mockRegistry,
|
||||
podLister: &fakeLister,
|
||||
}
|
||||
controller := api.ReplicationController{
|
||||
Spec: api.ReplicationControllerSpec{
|
||||
Selector: map[string]string{
|
||||
"foo": "bar",
|
||||
},
|
||||
},
|
||||
}
|
||||
ctx := api.NewContext()
|
||||
storage.fillCurrentState(ctx, &controller)
|
||||
if controller.Status.Replicas != 2 {
|
||||
t.Errorf("expected 2, got: %d", controller.Status.Replicas)
|
||||
}
|
||||
if !reflect.DeepEqual(fakeLister.s, labels.Set(controller.Spec.Selector).AsSelector()) {
|
||||
t.Errorf("unexpected output: %#v %#v", labels.Set(controller.Spec.Selector).AsSelector(), fakeLister.s)
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: remove, covered by TestCreate
|
||||
func TestCreateControllerWithGeneratedName(t *testing.T) {
|
||||
storage := NewREST(®istrytest.ControllerRegistry{}, nil)
|
||||
|
|
Loading…
Reference in New Issue