statefulset: use pvc lister, replace legacylisters

Use a PVC lister instead of a client when retrieving PVCs.

Replace unit test's use of legacylisters with the generated listers.
pull/6/head
Andy Goldstein 2017-02-24 09:31:40 -05:00
parent 70a268528e
commit bd912f50ba
8 changed files with 82 additions and 58 deletions

View File

@ -32,6 +32,7 @@ func startStatefulSetController(ctx ControllerContext) (bool, error) {
go statefulset.NewStatefulSetController(
ctx.NewInformerFactory.Core().V1().Pods(),
ctx.NewInformerFactory.Apps().V1beta1().StatefulSets(),
ctx.NewInformerFactory.Core().V1().PersistentVolumeClaims(),
ctx.ClientBuilder.ClientOrDie("statefulset-controller"),
).Run(1, ctx.Stop)
return true, nil

View File

@ -575,6 +575,7 @@ function start_kubelet {
fi
sudo -E "${GO_OUT}/hyperkube" kubelet ${priv_arg}\
--enable-cri=false \
--v=${LOG_LEVEL} \
--chaos-chance="${CHAOS_CHANCE}" \
--container-runtime="${CONTAINER_RUNTIME}" \

View File

@ -62,7 +62,6 @@ go_test(
"//pkg/client/informers/informers_generated/externalversions:go_default_library",
"//pkg/client/informers/informers_generated/externalversions/apps/v1beta1:go_default_library",
"//pkg/client/informers/informers_generated/externalversions/core/v1:go_default_library",
"//pkg/client/legacylisters:go_default_library",
"//pkg/client/listers/apps/v1beta1:go_default_library",
"//pkg/client/listers/core/v1:go_default_library",
"//pkg/controller:go_default_library",

View File

@ -21,7 +21,6 @@ import (
"strings"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
errorutils "k8s.io/apimachinery/pkg/util/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/pkg/api"
@ -56,8 +55,14 @@ type StatefulPodControlInterface interface {
UpdateStatefulSetReplicas(set *apps.StatefulSet, replicas int32) error
}
func NewRealStatefulPodControl(client clientset.Interface, setLister appslisters.StatefulSetLister, podLister corelisters.PodLister, recorder record.EventRecorder) StatefulPodControlInterface {
return &realStatefulPodControl{client, setLister, podLister, recorder}
func NewRealStatefulPodControl(
client clientset.Interface,
setLister appslisters.StatefulSetLister,
podLister corelisters.PodLister,
pvcLister corelisters.PersistentVolumeClaimLister,
recorder record.EventRecorder,
) StatefulPodControlInterface {
return &realStatefulPodControl{client, setLister, podLister, pvcLister, recorder}
}
// realStatefulPodControl implements StatefulPodControlInterface using a clientset.Interface to communicate with the
@ -66,6 +71,7 @@ type realStatefulPodControl struct {
client clientset.Interface
setLister appslisters.StatefulSetLister
podLister corelisters.PodLister
pvcLister corelisters.PersistentVolumeClaimLister
recorder record.EventRecorder
}
@ -209,18 +215,19 @@ func (spc *realStatefulPodControl) recordClaimEvent(verb string, set *apps.State
func (spc *realStatefulPodControl) createPersistentVolumeClaims(set *apps.StatefulSet, pod *v1.Pod) error {
var errs []error
for _, claim := range getPersistentVolumeClaims(set, pod) {
_, err := spc.client.Core().PersistentVolumeClaims(claim.Namespace).Get(claim.Name, metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
_, err := spc.client.Core().PersistentVolumeClaims(claim.Namespace).Create(&claim)
if err != nil {
errs = append(errs, fmt.Errorf("Failed to create PVC %s: %s", claim.Name, err))
}
spc.recordClaimEvent("create", set, pod, &claim, err)
} else {
errs = append(errs, fmt.Errorf("Failed to retrieve PVC %s: %s", claim.Name, err))
_, err := spc.pvcLister.PersistentVolumeClaims(claim.Namespace).Get(claim.Name)
switch {
case apierrors.IsNotFound(err):
_, err := spc.client.Core().PersistentVolumeClaims(claim.Namespace).Create(&claim)
if err != nil {
errs = append(errs, fmt.Errorf("Failed to create PVC %s: %s", claim.Name, err))
}
if err == nil || !apierrors.IsAlreadyExists(err) {
spc.recordClaimEvent("create", set, pod, &claim, err)
}
case err != nil:
errs = append(errs, fmt.Errorf("Failed to retrieve PVC %s: %s", claim.Name, err))
spc.recordClaimEvent("create", set, pod, &claim, err)
}
// TODO: Check resource requirements and accessmodes, update if necessary
}

View File

@ -40,7 +40,9 @@ func TestStatefulPodControlCreatesPods(t *testing.T) {
set := newStatefulSet(3)
pod := newStatefulSetPod(set, 0)
fakeClient := &fake.Clientset{}
control := NewRealStatefulPodControl(fakeClient, nil, nil, recorder)
pvcIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
pvcLister := corelisters.NewPersistentVolumeClaimLister(pvcIndexer)
control := NewRealStatefulPodControl(fakeClient, nil, nil, pvcLister, recorder)
fakeClient.AddReactor("get", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
return true, nil, apierrors.NewNotFound(action.GetResource().GroupResource(), action.GetResource().Resource)
})
@ -71,12 +73,14 @@ func TestStatefulPodControlCreatePodExists(t *testing.T) {
set := newStatefulSet(3)
pod := newStatefulSetPod(set, 0)
fakeClient := &fake.Clientset{}
control := NewRealStatefulPodControl(fakeClient, nil, nil, recorder)
pvcs := getPersistentVolumeClaims(set, pod)
fakeClient.AddReactor("get", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
claim := pvcs[action.GetResource().GroupResource().Resource]
return true, &claim, nil
})
pvcIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
for k := range pvcs {
pvc := pvcs[k]
pvcIndexer.Add(&pvc)
}
pvcLister := corelisters.NewPersistentVolumeClaimLister(pvcIndexer)
control := NewRealStatefulPodControl(fakeClient, nil, nil, pvcLister, recorder)
fakeClient.AddReactor("create", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
create := action.(core.CreateAction)
return true, create.GetObject(), nil
@ -101,10 +105,9 @@ func TestStatefulPodControlCreatePodPvcCreateFailure(t *testing.T) {
set := newStatefulSet(3)
pod := newStatefulSetPod(set, 0)
fakeClient := &fake.Clientset{}
control := NewRealStatefulPodControl(fakeClient, nil, nil, recorder)
fakeClient.AddReactor("get", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
return true, nil, apierrors.NewNotFound(action.GetResource().GroupResource(), action.GetResource().Resource)
})
pvcIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
pvcLister := corelisters.NewPersistentVolumeClaimLister(pvcIndexer)
control := NewRealStatefulPodControl(fakeClient, nil, nil, pvcLister, recorder)
fakeClient.AddReactor("create", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
return true, nil, apierrors.NewInternalError(errors.New("API server down"))
})
@ -126,15 +129,23 @@ func TestStatefulPodControlCreatePodPvcCreateFailure(t *testing.T) {
}
}
type fakeIndexer struct {
cache.Indexer
getError error
}
func (f *fakeIndexer) GetByKey(key string) (interface{}, bool, error) {
return nil, false, f.getError
}
func TestStatefulPodControlCreatePodPvcGetFailure(t *testing.T) {
recorder := record.NewFakeRecorder(10)
set := newStatefulSet(3)
pod := newStatefulSetPod(set, 0)
fakeClient := &fake.Clientset{}
control := NewRealStatefulPodControl(fakeClient, nil, nil, recorder)
fakeClient.AddReactor("get", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
return true, nil, apierrors.NewInternalError(errors.New("API server down"))
})
pvcIndexer := &fakeIndexer{getError: errors.New("API server down")}
pvcLister := corelisters.NewPersistentVolumeClaimLister(pvcIndexer)
control := NewRealStatefulPodControl(fakeClient, nil, nil, pvcLister, recorder)
fakeClient.AddReactor("create", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
return true, nil, apierrors.NewInternalError(errors.New("API server down"))
})
@ -161,10 +172,9 @@ func TestStatefulPodControlCreatePodFailed(t *testing.T) {
set := newStatefulSet(3)
pod := newStatefulSetPod(set, 0)
fakeClient := &fake.Clientset{}
control := NewRealStatefulPodControl(fakeClient, nil, nil, recorder)
fakeClient.AddReactor("get", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
return true, nil, apierrors.NewNotFound(action.GetResource().GroupResource(), action.GetResource().Resource)
})
pvcIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
pvcLister := corelisters.NewPersistentVolumeClaimLister(pvcIndexer)
control := NewRealStatefulPodControl(fakeClient, nil, nil, pvcLister, recorder)
fakeClient.AddReactor("create", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
create := action.(core.CreateAction)
return true, create.GetObject(), nil
@ -192,7 +202,7 @@ func TestStatefulPodControlNoOpUpdate(t *testing.T) {
set := newStatefulSet(3)
pod := newStatefulSetPod(set, 0)
fakeClient := &fake.Clientset{}
control := NewRealStatefulPodControl(fakeClient, nil, nil, recorder)
control := NewRealStatefulPodControl(fakeClient, nil, nil, nil, recorder)
fakeClient.AddReactor("*", "*", func(action core.Action) (bool, runtime.Object, error) {
t.Error("no-op update should not make any client invocation")
return true, nil, apierrors.NewInternalError(errors.New("If we are here we have a problem"))
@ -211,7 +221,7 @@ func TestStatefulPodControlUpdatesIdentity(t *testing.T) {
set := newStatefulSet(3)
pod := newStatefulSetPod(set, 0)
fakeClient := fake.NewSimpleClientset(set, pod)
control := NewRealStatefulPodControl(fakeClient, nil, nil, recorder)
control := NewRealStatefulPodControl(fakeClient, nil, nil, nil, recorder)
var updated *v1.Pod
fakeClient.PrependReactor("update", "pods", func(action core.Action) (bool, runtime.Object, error) {
update := action.(core.UpdateAction)
@ -243,7 +253,7 @@ func TestStatefulPodControlUpdateIdentityFailure(t *testing.T) {
gooPod.Name = "goo-0"
indexer.Add(gooPod)
podLister := corelisters.NewPodLister(indexer)
control := NewRealStatefulPodControl(fakeClient, nil, podLister, recorder)
control := NewRealStatefulPodControl(fakeClient, nil, podLister, nil, recorder)
fakeClient.AddReactor("update", "pods", func(action core.Action) (bool, runtime.Object, error) {
pod.Name = "goo-0"
return true, nil, apierrors.NewInternalError(errors.New("API server down"))
@ -268,7 +278,9 @@ func TestStatefulPodControlUpdatesPodStorage(t *testing.T) {
set := newStatefulSet(3)
pod := newStatefulSetPod(set, 0)
fakeClient := &fake.Clientset{}
control := NewRealStatefulPodControl(fakeClient, nil, nil, recorder)
pvcIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
pvcLister := corelisters.NewPersistentVolumeClaimLister(pvcIndexer)
control := NewRealStatefulPodControl(fakeClient, nil, nil, pvcLister, recorder)
pvcs := getPersistentVolumeClaims(set, pod)
volumes := make([]v1.Volume, len(pod.Spec.Volumes))
for i := range pod.Spec.Volumes {
@ -281,9 +293,6 @@ func TestStatefulPodControlUpdatesPodStorage(t *testing.T) {
update := action.(core.UpdateAction)
return true, update.GetObject(), nil
})
fakeClient.AddReactor("get", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
return true, nil, apierrors.NewNotFound(action.GetResource().GroupResource(), action.GetResource().Resource)
})
fakeClient.AddReactor("create", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
update := action.(core.UpdateAction)
return true, update.GetObject(), nil
@ -316,7 +325,9 @@ func TestStatefulPodControlUpdatePodStorageFailure(t *testing.T) {
set := newStatefulSet(3)
pod := newStatefulSetPod(set, 0)
fakeClient := &fake.Clientset{}
control := NewRealStatefulPodControl(fakeClient, nil, nil, recorder)
pvcIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
pvcLister := corelisters.NewPersistentVolumeClaimLister(pvcIndexer)
control := NewRealStatefulPodControl(fakeClient, nil, nil, pvcLister, recorder)
pvcs := getPersistentVolumeClaims(set, pod)
volumes := make([]v1.Volume, len(pod.Spec.Volumes))
for i := range pod.Spec.Volumes {
@ -329,9 +340,6 @@ func TestStatefulPodControlUpdatePodStorageFailure(t *testing.T) {
update := action.(core.UpdateAction)
return true, update.GetObject(), nil
})
fakeClient.AddReactor("get", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
return true, nil, apierrors.NewNotFound(action.GetResource().GroupResource(), action.GetResource().Resource)
})
fakeClient.AddReactor("create", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
return true, nil, apierrors.NewInternalError(errors.New("API server down"))
})
@ -359,7 +367,7 @@ func TestStatefulPodControlUpdatePodConflictSuccess(t *testing.T) {
gooPod.Name = "goo-0"
indexer.Add(gooPod)
podLister := corelisters.NewPodLister(indexer)
control := NewRealStatefulPodControl(fakeClient, nil, podLister, recorder)
control := NewRealStatefulPodControl(fakeClient, nil, podLister, nil, recorder)
conflict := false
fakeClient.AddReactor("update", "pods", func(action core.Action) (bool, runtime.Object, error) {
update := action.(core.UpdateAction)
@ -395,7 +403,7 @@ func TestStatefulPodControlUpdatePodConflictFailure(t *testing.T) {
updatedPod.Annotations[podapi.PodHostnameAnnotation] = "wrong"
indexer.Add(updatedPod)
podLister := corelisters.NewPodLister(indexer)
control := NewRealStatefulPodControl(fakeClient, nil, podLister, recorder)
control := NewRealStatefulPodControl(fakeClient, nil, podLister, nil, recorder)
fakeClient.AddReactor("update", "pods", func(action core.Action) (bool, runtime.Object, error) {
update := action.(core.UpdateAction)
return true, update.GetObject(), apierrors.NewConflict(action.GetResource().GroupResource(), pod.Name, errors.New("conflict"))
@ -418,7 +426,7 @@ func TestStatefulPodControlDeletesStatefulPod(t *testing.T) {
set := newStatefulSet(3)
pod := newStatefulSetPod(set, 0)
fakeClient := &fake.Clientset{}
control := NewRealStatefulPodControl(fakeClient, nil, nil, recorder)
control := NewRealStatefulPodControl(fakeClient, nil, nil, nil, recorder)
fakeClient.AddReactor("delete", "pods", func(action core.Action) (bool, runtime.Object, error) {
return true, nil, nil
})
@ -438,7 +446,7 @@ func TestStatefulPodControlDeleteFailure(t *testing.T) {
set := newStatefulSet(3)
pod := newStatefulSetPod(set, 0)
fakeClient := &fake.Clientset{}
control := NewRealStatefulPodControl(fakeClient, nil, nil, recorder)
control := NewRealStatefulPodControl(fakeClient, nil, nil, nil, recorder)
fakeClient.AddReactor("delete", "pods", func(action core.Action) (bool, runtime.Object, error) {
return true, nil, apierrors.NewInternalError(errors.New("API server down"))
})
@ -457,7 +465,7 @@ func TestStatefulPodControlUpdatesSetStatus(t *testing.T) {
recorder := record.NewFakeRecorder(10)
set := newStatefulSet(3)
fakeClient := &fake.Clientset{}
control := NewRealStatefulPodControl(fakeClient, nil, nil, recorder)
control := NewRealStatefulPodControl(fakeClient, nil, nil, nil, recorder)
fakeClient.AddReactor("update", "statefulsets", func(action core.Action) (bool, runtime.Object, error) {
update := action.(core.UpdateAction)
return true, update.GetObject(), nil
@ -481,7 +489,7 @@ func TestStatefulPodControlUpdateReplicasFailure(t *testing.T) {
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
indexer.Add(set)
setLister := appslisters.NewStatefulSetLister(indexer)
control := NewRealStatefulPodControl(fakeClient, setLister, nil, recorder)
control := NewRealStatefulPodControl(fakeClient, setLister, nil, nil, recorder)
fakeClient.AddReactor("update", "statefulsets", func(action core.Action) (bool, runtime.Object, error) {
return true, nil, apierrors.NewInternalError(errors.New("API server down"))
})
@ -502,7 +510,7 @@ func TestStatefulPodControlUpdateReplicasConflict(t *testing.T) {
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
indexer.Add(set)
setLister := appslisters.NewStatefulSetLister(indexer)
control := NewRealStatefulPodControl(fakeClient, setLister, nil, recorder)
control := NewRealStatefulPodControl(fakeClient, setLister, nil, nil, recorder)
fakeClient.AddReactor("update", "statefulsets", func(action core.Action) (bool, runtime.Object, error) {
update := action.(core.UpdateAction)
if !conflict {
@ -531,7 +539,7 @@ func TestStatefulPodControlUpdateReplicasConflictFailure(t *testing.T) {
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
indexer.Add(set)
setLister := appslisters.NewStatefulSetLister(indexer)
control := NewRealStatefulPodControl(fakeClient, setLister, nil, recorder)
control := NewRealStatefulPodControl(fakeClient, setLister, nil, nil, recorder)
fakeClient.AddReactor("update", "statefulsets", func(action core.Action) (bool, runtime.Object, error) {
update := action.(core.UpdateAction)
return true, update.GetObject(), apierrors.NewConflict(action.GetResource().GroupResource(), set.Name, errors.New("Object already exists"))

View File

@ -72,6 +72,7 @@ type StatefulSetController struct {
func NewStatefulSetController(
podInformer coreinformers.PodInformer,
setInformer appsinformers.StatefulSetInformer,
pvcInformer coreinformers.PersistentVolumeClaimInformer,
kubeClient clientset.Interface,
) *StatefulSetController {
eventBroadcaster := record.NewBroadcaster()
@ -81,8 +82,16 @@ func NewStatefulSetController(
ssc := &StatefulSetController{
kubeClient: kubeClient,
control: NewDefaultStatefulSetControl(NewRealStatefulPodControl(kubeClient, setInformer.Lister(), podInformer.Lister(), recorder)),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "statefulset"),
control: NewDefaultStatefulSetControl(
NewRealStatefulPodControl(
kubeClient,
setInformer.Lister(),
podInformer.Lister(),
pvcInformer.Lister(),
recorder,
),
),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "statefulset"),
}
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{

View File

@ -34,7 +34,6 @@ import (
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions"
appsinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/apps/v1beta1"
coreinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/core/v1"
listers "k8s.io/kubernetes/pkg/client/legacylisters"
appslisters "k8s.io/kubernetes/pkg/client/listers/apps/v1beta1"
corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1"
"k8s.io/kubernetes/pkg/controller"
@ -560,7 +559,7 @@ func (rt *requestTracker) reset() {
type fakeStatefulPodControl struct {
podsLister corelisters.PodLister
claimsLister listers.StoreToPersistentVolumeClaimLister
claimsLister corelisters.PersistentVolumeClaimLister
setsLister appslisters.StatefulSetLister
podsIndexer cache.Indexer
claimsIndexer cache.Indexer
@ -572,11 +571,10 @@ type fakeStatefulPodControl struct {
}
func newFakeStatefulPodControl(podInformer coreinformers.PodInformer, setInformer appsinformers.StatefulSetInformer) *fakeStatefulPodControl {
claimsIndexer := cache.NewIndexer(controller.KeyFunc,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
claimsIndexer := cache.NewIndexer(controller.KeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
return &fakeStatefulPodControl{
podInformer.Lister(),
listers.StoreToPersistentVolumeClaimLister{Indexer: claimsIndexer},
corelisters.NewPersistentVolumeClaimLister(claimsIndexer),
setInformer.Lister(),
podInformer.Informer().GetIndexer(),
claimsIndexer,

View File

@ -352,6 +352,7 @@ func newFakeStatefulSetController() (*StatefulSetController, *fakeStatefulPodCon
ssc := NewStatefulSetController(
informerFactory.Core().V1().Pods(),
informerFactory.Apps().V1beta1().StatefulSets(),
informerFactory.Core().V1().PersistentVolumeClaims(),
client,
)
ssc.podListerSynced = alwaysReady