PetSet alpha controller

pull/6/head
Prashanth Balasubramanian 2016-04-25 12:24:40 -07:00
parent 93e3df8e55
commit 6bc3052551
24 changed files with 2761 additions and 40 deletions

View File

@ -56,6 +56,7 @@ import (
namespacecontroller "k8s.io/kubernetes/pkg/controller/namespace"
nodecontroller "k8s.io/kubernetes/pkg/controller/node"
persistentvolumecontroller "k8s.io/kubernetes/pkg/controller/persistentvolume"
petset "k8s.io/kubernetes/pkg/controller/petset"
"k8s.io/kubernetes/pkg/controller/podautoscaler"
"k8s.io/kubernetes/pkg/controller/podautoscaler/metrics"
replicaset "k8s.io/kubernetes/pkg/controller/replicaset"
@ -351,6 +352,24 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
}
}
groupVersion = "apps/v1alpha1"
resources, found = resourceMap[groupVersion]
glog.Infof("Attempting to start petset, full resource map %+v", resourceMap)
if containsVersion(versions, groupVersion) && found {
glog.Infof("Starting %s apis", groupVersion)
if containsResource(resources, "petsets") {
glog.Infof("Starting PetSet controller")
resyncPeriod := ResyncPeriod(s)()
go petset.NewPetSetController(
podInformer,
// TODO: Switch to using clientset
kubeClient,
resyncPeriod,
).Run(1, wait.NeverStop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
}
}
volumePlugins := ProbeRecyclableVolumePlugins(s.VolumeConfiguration)
provisioner, err := NewVolumeProvisioner(cloud, s.VolumeConfiguration)
if err != nil {

View File

@ -541,6 +541,7 @@ _kubectl_describe()
must_have_one_noun+=("node")
must_have_one_noun+=("persistentvolume")
must_have_one_noun+=("persistentvolumeclaim")
must_have_one_noun+=("petset")
must_have_one_noun+=("pod")
must_have_one_noun+=("replicaset")
must_have_one_noun+=("replicationcontroller")
@ -570,6 +571,7 @@ _kubectl_describe()
noun_aliases+=("ns")
noun_aliases+=("persistentvolumeclaims")
noun_aliases+=("persistentvolumes")
noun_aliases+=("petsets")
noun_aliases+=("po")
noun_aliases+=("pods")
noun_aliases+=("pv")

View File

@ -17,6 +17,8 @@ limitations under the License.
package validation
import (
"reflect"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
unversionedvalidation "k8s.io/kubernetes/pkg/api/unversioned/validation"
@ -31,6 +33,9 @@ import (
// Prefix indicates this name will be used as part of generation, in which case
// trailing dashes are allowed.
func ValidatePetSetName(name string, prefix bool) (bool, string) {
// TODO: Validate that there's name for the suffix inserted by the pets.
// Currently this is just "-index". In the future we may allow a user
// specified list of suffixes and we need to validate the longest one.
return apivalidation.NameIsDNSSubdomain(name, prefix)
}
@ -96,8 +101,22 @@ func ValidatePetSet(petSet *apps.PetSet) field.ErrorList {
// ValidatePetSetUpdate tests if required fields in the PetSet are set.
func ValidatePetSetUpdate(petSet, oldPetSet *apps.PetSet) field.ErrorList {
allErrs := field.ErrorList{}
allErrs = append(allErrs, apivalidation.ValidateObjectMetaUpdate(&petSet.ObjectMeta, &oldPetSet.ObjectMeta, field.NewPath("metadata"))...)
allErrs = append(allErrs, ValidatePetSetSpec(&petSet.Spec, field.NewPath("spec"))...)
// TODO: For now we're taking the safe route and disallowing all updates to spec except for Spec.Replicas.
// Enable on a case by case basis.
restoreReplicas := petSet.Spec.Replicas
petSet.Spec.Replicas = oldPetSet.Spec.Replicas
// The generation changes for this update
restoreGeneration := petSet.Generation
petSet.Generation = oldPetSet.Generation
if !reflect.DeepEqual(petSet, oldPetSet) {
allErrs = append(allErrs, field.Forbidden(field.NewPath("spec"), "updates to petset spec for fields other than 'replicas' are forbidden."))
}
petSet.Spec.Replicas = restoreReplicas
petSet.Generation = restoreGeneration
allErrs = append(allErrs, apivalidation.ValidateNonnegativeField(int64(petSet.Spec.Replicas), field.NewPath("spec", "replicas"))...)
return allErrs
}

View File

@ -276,23 +276,6 @@ func TestValidatePetSetUpdate(t *testing.T) {
},
},
},
{
old: apps.PetSet{
ObjectMeta: api.ObjectMeta{Name: "abc", Namespace: api.NamespaceDefault},
Spec: apps.PetSetSpec{
Selector: &unversioned.LabelSelector{MatchLabels: validLabels},
Template: validPodTemplate.Template,
},
},
update: apps.PetSet{
ObjectMeta: api.ObjectMeta{Name: "abc", Namespace: api.NamespaceDefault},
Spec: apps.PetSetSpec{
Replicas: 1,
Selector: &unversioned.LabelSelector{MatchLabels: validLabels},
Template: readWriteVolumePodTemplate.Template,
},
},
},
}
for _, successCase := range successCases {
successCase.old.ObjectMeta.ResourceVersion = "1"
@ -319,6 +302,23 @@ func TestValidatePetSetUpdate(t *testing.T) {
},
},
},
"updates to a field other than spec.Replicas": {
old: apps.PetSet{
ObjectMeta: api.ObjectMeta{Name: "abc", Namespace: api.NamespaceDefault},
Spec: apps.PetSetSpec{
Selector: &unversioned.LabelSelector{MatchLabels: validLabels},
Template: validPodTemplate.Template,
},
},
update: apps.PetSet{
ObjectMeta: api.ObjectMeta{Name: "abc", Namespace: api.NamespaceDefault},
Spec: apps.PetSetSpec{
Replicas: 1,
Selector: &unversioned.LabelSelector{MatchLabels: validLabels},
Template: readWriteVolumePodTemplate.Template,
},
},
},
"invalid selector": {
old: apps.PetSet{
ObjectMeta: api.ObjectMeta{Name: "", Namespace: api.NamespaceDefault},

View File

@ -22,6 +22,7 @@ import (
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/apps"
"k8s.io/kubernetes/pkg/apis/batch"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/labels"
@ -557,3 +558,67 @@ func (s *StoreToPVCFetcher) GetPersistentVolumeClaimInfo(namespace string, id st
return o.(*api.PersistentVolumeClaim), nil
}
// StoreToPetSetLister gives a store List and Exists methods. The store must contain only PetSets.
type StoreToPetSetLister struct {
Store
}
// Exists checks if the given PetSet exists in the store.
func (s *StoreToPetSetLister) Exists(ps *apps.PetSet) (bool, error) {
_, exists, err := s.Store.Get(ps)
if err != nil {
return false, err
}
return exists, nil
}
// List lists all PetSets in the store.
func (s *StoreToPetSetLister) List() (psList []apps.PetSet, err error) {
for _, ps := range s.Store.List() {
psList = append(psList, *(ps.(*apps.PetSet)))
}
return psList, nil
}
type storePetSetsNamespacer struct {
store Store
namespace string
}
func (s *StoreToPetSetLister) PetSets(namespace string) storePetSetsNamespacer {
return storePetSetsNamespacer{s.Store, namespace}
}
// GetPodPetSets returns a list of PetSets managing a pod. Returns an error only if no matching PetSets are found.
func (s *StoreToPetSetLister) GetPodPetSets(pod *api.Pod) (psList []apps.PetSet, err error) {
var selector labels.Selector
var ps apps.PetSet
if len(pod.Labels) == 0 {
err = fmt.Errorf("no PetSets found for pod %v because it has no labels", pod.Name)
return
}
for _, m := range s.Store.List() {
ps = *m.(*apps.PetSet)
if ps.Namespace != pod.Namespace {
continue
}
selector, err = unversioned.LabelSelectorAsSelector(ps.Spec.Selector)
if err != nil {
err = fmt.Errorf("invalid selector: %v", err)
return
}
// If a PetSet with a nil or empty selector creeps in, it should match nothing, not everything.
if selector.Empty() || !selector.Matches(labels.Set(pod.Labels)) {
continue
}
psList = append(psList, ps)
}
if len(psList) == 0 {
err = fmt.Errorf("could not find PetSet for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels)
}
return
}

View File

@ -32,7 +32,7 @@ type AppsClient struct {
*restclient.RESTClient
}
func (c *AppsClient) PetSet(namespace string) PetSetInterface {
func (c *AppsClient) PetSets(namespace string) PetSetInterface {
return newPetSet(c, namespace)
}

View File

@ -24,7 +24,7 @@ import (
// PetSetNamespacer has methods to work with PetSet resources in a namespace
type PetSetNamespacer interface {
PetSet(namespace string) PetSetInterface
PetSets(namespace string) PetSetInterface
}
// PetSetInterface exposes methods to work on PetSet resources.

View File

@ -56,7 +56,7 @@ func TestListPetSets(t *testing.T) {
},
},
}
receivedRSList, err := c.Setup(t).Apps().PetSet(ns).List(api.ListOptions{})
receivedRSList, err := c.Setup(t).Apps().PetSets(ns).List(api.ListOptions{})
c.Validate(t, receivedRSList, err)
}
@ -81,14 +81,14 @@ func TestGetPetSet(t *testing.T) {
},
},
}
receivedRS, err := c.Setup(t).Apps().PetSet(ns).Get("foo")
receivedRS, err := c.Setup(t).Apps().PetSets(ns).Get("foo")
c.Validate(t, receivedRS, err)
}
func TestGetPetSetWithNoName(t *testing.T) {
ns := api.NamespaceDefault
c := &simple.Client{Error: true}
receivedPod, err := c.Setup(t).Apps().PetSet(ns).Get("")
receivedPod, err := c.Setup(t).Apps().PetSets(ns).Get("")
if (err != nil) && (err.Error() != simple.NameRequiredError) {
t.Errorf("Expected error: %v, but got %v", simple.NameRequiredError, err)
}
@ -120,7 +120,7 @@ func TestUpdatePetSet(t *testing.T) {
},
},
}
receivedRS, err := c.Setup(t).Apps().PetSet(ns).Update(requestRS)
receivedRS, err := c.Setup(t).Apps().PetSets(ns).Update(requestRS)
c.Validate(t, receivedRS, err)
}
@ -130,7 +130,7 @@ func TestDeletePetSet(t *testing.T) {
Request: simple.Request{Method: "DELETE", Path: testapi.Apps.ResourcePath(getPetSetResourceName(), ns, "foo"), Query: simple.BuildQueryValues(nil)},
Response: simple.Response{StatusCode: 200},
}
err := c.Setup(t).Apps().PetSet(ns).Delete("foo", nil)
err := c.Setup(t).Apps().PetSets(ns).Delete("foo", nil)
c.Validate(t, nil, err)
}
@ -158,7 +158,7 @@ func TestCreatePetSet(t *testing.T) {
},
},
}
receivedRS, err := c.Setup(t).Apps().PetSet(ns).Create(requestRS)
receivedRS, err := c.Setup(t).Apps().PetSets(ns).Create(requestRS)
c.Validate(t, receivedRS, err)
}

View File

@ -413,15 +413,15 @@ func (r RealPodControl) CreatePodsOnNode(nodeName, namespace string, template *a
return r.createPods(nodeName, namespace, template, object)
}
func (r RealPodControl) createPods(nodeName, namespace string, template *api.PodTemplateSpec, object runtime.Object) error {
func GetPodFromTemplate(template *api.PodTemplateSpec, parentObject runtime.Object) (*api.Pod, error) {
desiredLabels := getPodsLabelSet(template)
desiredAnnotations, err := getPodsAnnotationSet(template, object)
desiredAnnotations, err := getPodsAnnotationSet(template, parentObject)
if err != nil {
return err
return nil, err
}
accessor, err := meta.Accessor(object)
accessor, err := meta.Accessor(parentObject)
if err != nil {
return fmt.Errorf("object does not have ObjectMeta, %v", err)
return nil, fmt.Errorf("parentObject does not have ObjectMeta, %v", err)
}
prefix := getPodsPrefix(accessor.GetName())
@ -433,7 +433,15 @@ func (r RealPodControl) createPods(nodeName, namespace string, template *api.Pod
},
}
if err := api.Scheme.Convert(&template.Spec, &pod.Spec); err != nil {
return fmt.Errorf("unable to convert pod template: %v", err)
return nil, fmt.Errorf("unable to convert pod template: %v", err)
}
return pod, nil
}
func (r RealPodControl) createPods(nodeName, namespace string, template *api.PodTemplateSpec, object runtime.Object) error {
pod, err := GetPodFromTemplate(template, object)
if err != nil {
return err
}
if len(nodeName) != 0 {
pod.Spec.NodeName = nodeName
@ -445,6 +453,11 @@ func (r RealPodControl) createPods(nodeName, namespace string, template *api.Pod
r.Recorder.Eventf(object, api.EventTypeWarning, "FailedCreate", "Error creating: %v", err)
return fmt.Errorf("unable to create pods: %v", err)
} else {
accessor, err := meta.Accessor(object)
if err != nil {
glog.Errorf("parentObject does not have ObjectMeta, %v", err)
return nil
}
glog.V(4).Infof("Controller %v created pod %v", accessor.GetName(), newPod.Name)
r.Recorder.Eventf(object, api.EventTypeNormal, "SuccessfulCreate", "Created pod: %v", newPod.Name)
}

View File

@ -117,6 +117,7 @@ func NewPersistentVolumeRecycler(kubeClient clientset.Interface, syncPeriod time
glog.Errorf("Error casting object to PersistentVolume: %v", obj)
return
}
recycler.reclaimVolume(pv)
recycler.removeReleasedVolume(pv)
},
},

View File

@ -0,0 +1,324 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package petset
import (
"fmt"
"time"
"k8s.io/kubernetes/pkg/api"
api_pod "k8s.io/kubernetes/pkg/api/pod"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/apps"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/sets"
"speter.net/go/exp/math/dec/inf"
)
func dec(i int64, exponent int) *inf.Dec {
return inf.NewDec(i, inf.Scale(-exponent))
}
func newPVC(name string) api.PersistentVolumeClaim {
return api.PersistentVolumeClaim{
ObjectMeta: api.ObjectMeta{
Name: name,
},
Spec: api.PersistentVolumeClaimSpec{
Resources: api.ResourceRequirements{
Requests: api.ResourceList{
api.ResourceStorage: resource.Quantity{
Amount: dec(1, 0),
Format: resource.BinarySI,
},
},
},
},
}
}
func newPetSetWithVolumes(replicas int, name string, petMounts []api.VolumeMount, podMounts []api.VolumeMount) *apps.PetSet {
mounts := append(petMounts, podMounts...)
claims := []api.PersistentVolumeClaim{}
for _, m := range petMounts {
claims = append(claims, newPVC(m.Name))
}
vols := []api.Volume{}
for _, m := range podMounts {
vols = append(vols, api.Volume{
Name: m.Name,
VolumeSource: api.VolumeSource{
HostPath: &api.HostPathVolumeSource{
Path: fmt.Sprintf("/tmp/%v", m.Name),
},
},
})
}
return &apps.PetSet{
TypeMeta: unversioned.TypeMeta{
Kind: "PetSet",
APIVersion: "apps/v1beta1",
},
ObjectMeta: api.ObjectMeta{
Name: name,
Namespace: api.NamespaceDefault,
UID: types.UID("test"),
},
Spec: apps.PetSetSpec{
Selector: &unversioned.LabelSelector{
MatchLabels: map[string]string{"foo": "bar"},
},
Replicas: replicas,
Template: api.PodTemplateSpec{
Spec: api.PodSpec{
Containers: []api.Container{
{
Name: "nginx",
Image: "nginx",
VolumeMounts: mounts,
},
},
Volumes: vols,
},
},
VolumeClaimTemplates: claims,
ServiceName: "governingsvc",
},
}
}
func runningPod(ns, name string) *api.Pod {
p := &api.Pod{Status: api.PodStatus{Phase: api.PodRunning}}
p.Namespace = ns
p.Name = name
return p
}
func newPodList(ps *apps.PetSet, num int) []*api.Pod {
// knownPods are pods in the system
knownPods := []*api.Pod{}
for i := 0; i < num; i++ {
k, _ := newPCB(fmt.Sprintf("%v", i), ps)
knownPods = append(knownPods, k.pod)
}
return knownPods
}
func newPetSet(replicas int) *apps.PetSet {
petMounts := []api.VolumeMount{
{Name: "datadir", MountPath: "/tmp/zookeeper"},
}
podMounts := []api.VolumeMount{
{Name: "home", MountPath: "/home"},
}
return newPetSetWithVolumes(replicas, "foo", petMounts, podMounts)
}
func checkPodForMount(pod *api.Pod, mountName string) error {
for _, c := range pod.Spec.Containers {
for _, v := range c.VolumeMounts {
if v.Name == mountName {
return nil
}
}
}
return fmt.Errorf("Found volume but no associated mount %v in pod %v", mountName, pod.Name)
}
func newFakePetClient() *fakePetClient {
return &fakePetClient{
pets: []*pcb{},
claims: []api.PersistentVolumeClaim{},
recorder: &record.FakeRecorder{},
petHealthChecker: &defaultPetHealthChecker{},
}
}
type fakePetClient struct {
pets []*pcb
claims []api.PersistentVolumeClaim
petsCreated, petsDeleted int
claimsCreated, claimsDeleted int
recorder record.EventRecorder
petHealthChecker
}
// Delete fakes pet client deletion.
func (f *fakePetClient) Delete(p *pcb) error {
pets := []*pcb{}
found := false
for i, pet := range f.pets {
if p.pod.Name == pet.pod.Name {
found = true
f.recorder.Eventf(pet.parent, api.EventTypeNormal, "SuccessfulDelete", "pet: %v", pet.pod.Name)
continue
}
pets = append(pets, f.pets[i])
}
if !found {
// TODO: Return proper not found error
return fmt.Errorf("Delete failed: pet %v doesn't exist", p.pod.Name)
}
f.pets = pets
f.petsDeleted++
return nil
}
// Get fakes getting pets.
func (f *fakePetClient) Get(p *pcb) (*pcb, bool, error) {
for i, pet := range f.pets {
if p.pod.Name == pet.pod.Name {
return f.pets[i], true, nil
}
}
return nil, false, nil
}
// Create fakes pet creation.
func (f *fakePetClient) Create(p *pcb) error {
for _, pet := range f.pets {
if p.pod.Name == pet.pod.Name {
return fmt.Errorf("Create failed: pet %v already exists", p.pod.Name)
}
}
f.recorder.Eventf(p.parent, api.EventTypeNormal, "SuccessfulCreate", "pet: %v", p.pod.Name)
f.pets = append(f.pets, p)
f.petsCreated++
return nil
}
// Update fakes pet updates.
func (f *fakePetClient) Update(expected, wanted *pcb) error {
found := false
pets := []*pcb{}
for i, pet := range f.pets {
if wanted.pod.Name == pet.pod.Name {
f.pets[i].pod.Annotations[api_pod.PodHostnameAnnotation] = wanted.pod.Annotations[api_pod.PodHostnameAnnotation]
f.pets[i].pod.Annotations[api_pod.PodSubdomainAnnotation] = wanted.pod.Annotations[api_pod.PodSubdomainAnnotation]
f.pets[i].pod.Spec = wanted.pod.Spec
found = true
}
pets = append(pets, f.pets[i])
}
f.pets = pets
if !found {
return fmt.Errorf("Cannot update pet %v not found", wanted.pod.Name)
}
// TODO: Delete pvcs/volumes that are in wanted but not in expected.
return nil
}
func (f *fakePetClient) getPodList() []*api.Pod {
p := []*api.Pod{}
for i, pet := range f.pets {
if pet.pod == nil {
continue
}
p = append(p, f.pets[i].pod)
}
return p
}
func (f *fakePetClient) deletePetAtIndex(index int) {
p := []*pcb{}
for i := range f.pets {
if i != index {
p = append(p, f.pets[i])
}
}
f.pets = p
}
func (f *fakePetClient) setHealthy(index int) error {
if len(f.pets) < index {
return fmt.Errorf("Index out of range, len %v index %v", len(f.pets), index)
}
f.pets[index].pod.Status.Phase = api.PodRunning
f.pets[index].pod.Annotations[PetSetInitAnnotation] = "true"
return nil
}
// isHealthy is a convenience wrapper around the default health checker.
// The first invocation returns not-healthy, but marks the pet healthy so
// subsequent invocations see it as healthy.
func (f *fakePetClient) isHealthy(pod *api.Pod) bool {
if f.petHealthChecker.isHealthy(pod) {
return true
}
return false
}
func (f *fakePetClient) setDeletionTimestamp(index int) error {
if len(f.pets) < index {
return fmt.Errorf("Index out of range, len %v index %v", len(f.pets), index)
}
f.pets[index].pod.DeletionTimestamp = &unversioned.Time{Time: time.Now()}
return nil
}
// SyncPVCs fakes pvc syncing.
func (f *fakePetClient) SyncPVCs(pet *pcb) error {
v := pet.pvcs
updateClaims := map[string]api.PersistentVolumeClaim{}
for i, update := range v {
updateClaims[update.Name] = v[i]
}
claimList := []api.PersistentVolumeClaim{}
for i, existing := range f.claims {
if update, ok := updateClaims[existing.Name]; ok {
claimList = append(claimList, update)
delete(updateClaims, existing.Name)
} else {
claimList = append(claimList, f.claims[i])
}
}
for _, remaining := range updateClaims {
claimList = append(claimList, remaining)
f.claimsCreated++
f.recorder.Eventf(pet.parent, api.EventTypeNormal, "SuccessfulCreate", "pvc: %v", remaining.Name)
}
f.claims = claimList
return nil
}
// DeletePVCs fakes pvc deletion.
func (f *fakePetClient) DeletePVCs(pet *pcb) error {
claimsToDelete := pet.pvcs
deleteClaimNames := sets.NewString()
for _, c := range claimsToDelete {
deleteClaimNames.Insert(c.Name)
}
pvcs := []api.PersistentVolumeClaim{}
for i, existing := range f.claims {
if deleteClaimNames.Has(existing.Name) {
deleteClaimNames.Delete(existing.Name)
f.claimsDeleted++
f.recorder.Eventf(pet.parent, api.EventTypeNormal, "SuccessfulDelete", "pvc: %v", existing.Name)
continue
}
pvcs = append(pvcs, f.claims[i])
}
f.claims = pvcs
if deleteClaimNames.Len() != 0 {
return fmt.Errorf("Claims %+v don't exist. Failed deletion.", deleteClaimNames)
}
return nil
}

View File

@ -0,0 +1,247 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package petset
import (
"crypto/md5"
"fmt"
"sort"
"strings"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
podapi "k8s.io/kubernetes/pkg/api/pod"
"k8s.io/kubernetes/pkg/apis/apps"
"k8s.io/kubernetes/pkg/util/sets"
)
// identityMapper is an interface for assigning identities to a pet.
// All existing identity mappers just append "-(index)" to the petset name to
// generate a unique identity. This is used in claims/DNS/hostname/petname
// etc. There's a more elegant way to achieve this mapping, but we're
// taking the simplest route till we have data on whether users will need
// more customization.
// Note that running a single identity mapper is not guaranteed to give
// your pet a unique identity. You must run them all. Order doesn't matter.
type identityMapper interface {
// SetIdentity takes an id and assigns the given pet an identity based
// on the pet set spec. The is must be unique amongst members of the
// pet set.
SetIdentity(id string, pet *api.Pod)
// Identity returns the identity of the pet.
Identity(pod *api.Pod) string
}
func newIdentityMappers(ps *apps.PetSet) []identityMapper {
return []identityMapper{
&NameIdentityMapper{ps},
&NetworkIdentityMapper{ps},
&VolumeIdentityMapper{ps},
}
}
// NetworkIdentityMapper assigns network identity to pets.
type NetworkIdentityMapper struct {
ps *apps.PetSet
}
// SetIdentity sets network identity on the pet.
func (n *NetworkIdentityMapper) SetIdentity(id string, pet *api.Pod) {
pet.Annotations[podapi.PodHostnameAnnotation] = fmt.Sprintf("%v-%v", n.ps.Name, id)
pet.Annotations[podapi.PodSubdomainAnnotation] = n.ps.Spec.ServiceName
return
}
// Identity returns the network identity of the pet.
func (n *NetworkIdentityMapper) Identity(pet *api.Pod) string {
return n.String(pet)
}
// String is a string function for the network identity of the pet.
func (n *NetworkIdentityMapper) String(pet *api.Pod) string {
hostname := pet.Annotations[podapi.PodHostnameAnnotation]
subdomain := pet.Annotations[podapi.PodSubdomainAnnotation]
return strings.Join([]string{hostname, subdomain, n.ps.Namespace}, ".")
}
// VolumeIdentityMapper assigns storage identity to pets.
type VolumeIdentityMapper struct {
ps *apps.PetSet
}
// SetIdentity sets storge identity on the pet.
func (v *VolumeIdentityMapper) SetIdentity(id string, pet *api.Pod) {
petVolumes := []api.Volume{}
petClaims := v.GetClaims(id)
// These volumes will all go down with the pod. If a name matches one of
// the claims in the pet set, it gets clobbered.
podVolumes := map[string]api.Volume{}
for _, podVol := range pet.Spec.Volumes {
podVolumes[podVol.Name] = podVol
}
// Insert claims for the idempotent petSet volumes
for name, claim := range petClaims {
// Volumes on a pet for which there are no associated claims on the
// petset are pod local, and die with the pod.
podVol, ok := podVolumes[name]
if ok {
// TODO: Validate and reject this.
glog.V(4).Infof("Overwriting existing volume source %v", podVol.Name)
}
newVol := api.Volume{
Name: name,
VolumeSource: api.VolumeSource{
PersistentVolumeClaim: &api.PersistentVolumeClaimVolumeSource{
ClaimName: claim.Name,
// TODO: Use source definition to set this value when we have one.
ReadOnly: false,
},
},
}
petVolumes = append(petVolumes, newVol)
}
// Transfer any ephemeral pod volumes
for name, vol := range podVolumes {
if _, ok := petClaims[name]; !ok {
petVolumes = append(petVolumes, vol)
}
}
pet.Spec.Volumes = petVolumes
return
}
// Identity returns the storage identity of the pet.
func (v *VolumeIdentityMapper) Identity(pet *api.Pod) string {
// TODO: Make this a hash?
return v.String(pet)
}
// String is a string function for the network identity of the pet.
func (v *VolumeIdentityMapper) String(pet *api.Pod) string {
ids := []string{}
petVols := sets.NewString()
for _, petVol := range v.ps.Spec.VolumeClaimTemplates {
petVols.Insert(petVol.Name)
}
for _, podVol := range pet.Spec.Volumes {
// Volumes on a pet for which there are no associated claims on the
// petset are pod local, and die with the pod.
if !petVols.Has(podVol.Name) {
continue
}
if podVol.VolumeSource.PersistentVolumeClaim == nil {
// TODO: Is this a part of the identity?
ids = append(ids, fmt.Sprintf("%v:None", podVol.Name))
continue
}
ids = append(ids, fmt.Sprintf("%v:%v", podVol.Name, podVol.VolumeSource.PersistentVolumeClaim.ClaimName))
}
sort.Strings(ids)
return strings.Join(ids, "")
}
// GetClaims returns the volume claims associated with the given id.
// The claims belong to the petset. The id should be unique within a petset.
func (v *VolumeIdentityMapper) GetClaims(id string) map[string]api.PersistentVolumeClaim {
petClaims := map[string]api.PersistentVolumeClaim{}
for _, pvc := range v.ps.Spec.VolumeClaimTemplates {
claim := pvc
// TODO: Name length checking in validation.
claim.Name = fmt.Sprintf("%v-%v-%v", claim.Name, v.ps.Name, id)
claim.Namespace = v.ps.Namespace
claim.Labels = v.ps.Spec.Selector.MatchLabels
// TODO: We're assuming that the claim template has a volume QoS key, eg:
// volume.alpha.kubernetes.io/storage-class: anything
petClaims[pvc.Name] = claim
}
return petClaims
}
// GetClaimsForPet returns the pvcs for the given pet.
func (v *VolumeIdentityMapper) GetClaimsForPet(pet *api.Pod) []api.PersistentVolumeClaim {
// Strip out the "-(index)" from the pet name and use it to generate
// claim names.
id := strings.Split(pet.Name, "-")
petID := id[len(id)-1]
pvcs := []api.PersistentVolumeClaim{}
for _, pvc := range v.GetClaims(petID) {
pvcs = append(pvcs, pvc)
}
return pvcs
}
// NameIdentityMapper assigns names to pets.
// It also puts the pet in the same namespace as the parent.
type NameIdentityMapper struct {
ps *apps.PetSet
}
// SetIdentity sets the pet namespace and name.
func (n *NameIdentityMapper) SetIdentity(id string, pet *api.Pod) {
pet.Name = fmt.Sprintf("%v-%v", n.ps.Name, id)
pet.Namespace = n.ps.Namespace
return
}
// Identity returns the name identity of the pet.
func (n *NameIdentityMapper) Identity(pet *api.Pod) string {
return n.String(pet)
}
// String is a string function for the name identity of the pet.
func (n *NameIdentityMapper) String(pet *api.Pod) string {
return fmt.Sprintf("%v/%v", pet.Namespace, pet.Name)
}
// identityHash computes a hash of the pet by running all the above identity
// mappers.
func identityHash(ps *apps.PetSet, pet *api.Pod) string {
id := ""
for _, idMapper := range newIdentityMappers(ps) {
id += idMapper.Identity(pet)
}
return fmt.Sprintf("%x", md5.Sum([]byte(id)))
}
// copyPetID gives the realPet the same identity as the expectedPet.
// Note that this is *not* a literal copy, but a copy of the fields that
// contribute to the pet's identity. The returned boolean 'needsUpdate' will
// be false if the realPet already has the same identity as the expectedPet.
func copyPetID(realPet, expectedPet *pcb) (pod api.Pod, needsUpdate bool, err error) {
if realPet.pod == nil || expectedPet.pod == nil {
return pod, false, fmt.Errorf("Need a valid to and from pet for copy")
}
if realPet.parent.UID != expectedPet.parent.UID {
return pod, false, fmt.Errorf("Cannot copy pets with different parents")
}
ps := realPet.parent
if identityHash(ps, realPet.pod) == identityHash(ps, expectedPet.pod) {
return *realPet.pod, false, nil
}
copyPod := *realPet.pod
// This is the easiest way to give an identity to a pod. It won't work
// when we stop using names for id.
for _, idMapper := range newIdentityMappers(ps) {
idMapper.SetIdentity(expectedPet.id, &copyPod)
}
return copyPod, true, nil
}

View File

@ -0,0 +1,179 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package petset
import (
"fmt"
"reflect"
"strings"
"k8s.io/kubernetes/pkg/api"
api_pod "k8s.io/kubernetes/pkg/api/pod"
"testing"
)
func TestPetIDName(t *testing.T) {
replicas := 3
ps := newPetSet(replicas)
for i := 0; i < replicas; i++ {
petName := fmt.Sprintf("%v-%d", ps.Name, i)
pcb, err := newPCB(fmt.Sprintf("%d", i), ps)
if err != nil {
t.Fatalf("Failed to generate pet %v", err)
}
pod := pcb.pod
if pod.Name != petName || pod.Namespace != ps.Namespace {
t.Errorf("Wrong name identity, expected %v", pcb.pod.Name)
}
}
}
func TestPetIDDNS(t *testing.T) {
replicas := 3
ps := newPetSet(replicas)
for i := 0; i < replicas; i++ {
petName := fmt.Sprintf("%v-%d", ps.Name, i)
petSubdomain := ps.Spec.ServiceName
pcb, err := newPCB(fmt.Sprintf("%d", i), ps)
pod := pcb.pod
if err != nil {
t.Fatalf("Failed to generate pet %v", err)
}
if hostname, ok := pod.Annotations[api_pod.PodHostnameAnnotation]; !ok || hostname != petName {
t.Errorf("Wrong hostname: %v", petName)
}
// TODO: Check this against the governing service.
if subdomain, ok := pod.Annotations[api_pod.PodSubdomainAnnotation]; !ok || subdomain != petSubdomain {
t.Errorf("Wrong subdomain: %v", petName)
}
}
}
func TestPetIDVolume(t *testing.T) {
replicas := 3
ps := newPetSet(replicas)
for i := 0; i < replicas; i++ {
pcb, err := newPCB(fmt.Sprintf("%d", i), ps)
if err != nil {
t.Fatalf("Failed to generate pet %v", err)
}
pod := pcb.pod
petName := fmt.Sprintf("%v-%d", ps.Name, i)
claimName := fmt.Sprintf("datadir-%v", petName)
for _, v := range pod.Spec.Volumes {
switch v.Name {
case "datadir":
c := v.VolumeSource.PersistentVolumeClaim
if c == nil || c.ClaimName != claimName {
t.Fatalf("Unexpected claim %v", c)
}
if err := checkPodForMount(pod, "datadir"); err != nil {
t.Errorf("Expected pod mount: %v", err)
}
case "home":
h := v.VolumeSource.HostPath
if h == nil || h.Path != "/tmp/home" {
t.Errorf("Unexpected modification to hostpath, expected /tmp/home got %+v", h)
}
default:
t.Errorf("Unexpected volume %v", v.Name)
}
}
}
// TODO: Check volume mounts.
}
func TestPetIDVolumeClaims(t *testing.T) {
replicas := 3
ps := newPetSet(replicas)
for i := 0; i < replicas; i++ {
pcb, err := newPCB(fmt.Sprintf("%v", i), ps)
if err != nil {
t.Fatalf("Failed to generate pet %v", err)
}
pvcs := pcb.pvcs
petName := fmt.Sprintf("%v-%d", ps.Name, i)
claimName := fmt.Sprintf("datadir-%v", petName)
if len(pvcs) != 1 || pvcs[0].Name != claimName {
t.Errorf("Wrong pvc expected %v got %v", claimName, pvcs[0].Name)
}
}
}
func TestPetIDCrossAssignment(t *testing.T) {
replicas := 3
ps := newPetSet(replicas)
nameMapper := &NameIdentityMapper{ps}
volumeMapper := &VolumeIdentityMapper{ps}
networkMapper := &NetworkIdentityMapper{ps}
// Check that the name is consistent across identity.
for i := 0; i < replicas; i++ {
pet, _ := newPCB(fmt.Sprintf("%v", i), ps)
p := pet.pod
name := strings.Split(nameMapper.Identity(p), "/")[1]
network := networkMapper.Identity(p)
volume := volumeMapper.Identity(p)
petVolume := strings.Split(volume, ":")[1]
if petVolume != fmt.Sprintf("datadir-%v", name) {
t.Errorf("Unexpected pet volume name %v, expected %v", petVolume, name)
}
if network != fmt.Sprintf("%v.%v.%v", name, ps.Spec.ServiceName, ps.Namespace) {
t.Errorf("Unexpected pet network ID %v, expected %v", network, name)
}
t.Logf("[%v] volume: %+v, network: %+v, name: %+v", i, volume, network, name)
}
}
func TestPetIDReset(t *testing.T) {
replicas := 2
ps := newPetSet(replicas)
firstPCB, err := newPCB("1", ps)
secondPCB, err := newPCB("2", ps)
if identityHash(ps, firstPCB.pod) == identityHash(ps, secondPCB.pod) {
t.Fatalf("Failed to generate uniquey identities:\n%+v\n%+v", firstPCB.pod.Spec, secondPCB.pod.Spec)
}
userAdded := api.Volume{
Name: "test",
VolumeSource: api.VolumeSource{
EmptyDir: &api.EmptyDirVolumeSource{Medium: api.StorageMediumMemory},
},
}
firstPCB.pod.Spec.Volumes = append(firstPCB.pod.Spec.Volumes, userAdded)
pod, needsUpdate, err := copyPetID(firstPCB, secondPCB)
if err != nil {
t.Errorf("%v", err)
}
if !needsUpdate {
t.Errorf("expected update since identity of %v was reset", secondPCB.pod.Name)
}
if identityHash(ps, &pod) != identityHash(ps, secondPCB.pod) {
t.Errorf("Failed to copy identity for pod %v -> %v", firstPCB.pod.Name, secondPCB.pod.Name)
}
foundVol := false
for _, v := range pod.Spec.Volumes {
if reflect.DeepEqual(v, userAdded) {
foundVol = true
break
}
}
if !foundVol {
t.Errorf("User added volume was corrupted by reset action.")
}
}

View File

@ -0,0 +1,163 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package petset
import (
"fmt"
"sort"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/apis/apps"
"k8s.io/kubernetes/pkg/controller"
)
// newPCB generates a new PCB using the id string as a unique qualifier
func newPCB(id string, ps *apps.PetSet) (*pcb, error) {
petPod, err := controller.GetPodFromTemplate(&ps.Spec.Template, ps)
if err != nil {
return nil, err
}
for _, im := range newIdentityMappers(ps) {
im.SetIdentity(id, petPod)
}
petPVCs := []api.PersistentVolumeClaim{}
vMapper := &VolumeIdentityMapper{ps}
for _, c := range vMapper.GetClaims(id) {
petPVCs = append(petPVCs, c)
}
// TODO: Replace id field with IdentityHash, since id is more than just an index.
return &pcb{pod: petPod, pvcs: petPVCs, id: id, parent: ps}, nil
}
// petQueue is a custom datastructure that's resembles a queue of pets.
type petQueue struct {
pets []*pcb
idMapper identityMapper
}
// enqueue enqueues the given pet, evicting any pets with the same id
func (pt *petQueue) enqueue(p *pcb) {
if p == nil {
pt.pets = append(pt.pets, nil)
return
}
// Pop an existing pet from the know list, append the new pet to the end.
petList := []*pcb{}
petID := pt.idMapper.Identity(p.pod)
for i := range pt.pets {
if petID != pt.idMapper.Identity(pt.pets[i].pod) {
petList = append(petList, pt.pets[i])
}
}
pt.pets = petList
p.event = syncPet
pt.pets = append(pt.pets, p)
}
// dequeue returns the last element of the queue
func (pt *petQueue) dequeue() *pcb {
if pt.empty() {
glog.Warningf("Dequeue invoked on an empty queue")
return nil
}
l := len(pt.pets) - 1
pet := pt.pets[l]
pt.pets = pt.pets[:l]
return pet
}
// empty returns true if the pet queue is empty.
func (pt *petQueue) empty() bool {
return len(pt.pets) == 0
}
// NewPetQueue returns a queue for tracking pets
func NewPetQueue(ps *apps.PetSet, podList []*api.Pod) *petQueue {
pt := petQueue{pets: []*pcb{}, idMapper: &NameIdentityMapper{ps}}
// Seed the queue with existing pets. Assume all pets are scheduled for
// deletion, enqueuing a pet will "undelete" it. We always want to delete
// from the higher ids, so sort by creation timestamp.
sort.Sort(PodsByCreationTimestamp(podList))
vMapper := VolumeIdentityMapper{ps}
for i := range podList {
pod := podList[i]
pt.pets = append(pt.pets, &pcb{pod: pod, pvcs: vMapper.GetClaimsForPet(pod), parent: ps, event: deletePet, id: fmt.Sprintf("%v", i)})
}
return &pt
}
// petsetIterator implements a simple iterator over pets in the given petset.
type petSetIterator struct {
// ps is the petset for this iterator.
ps *apps.PetSet
// queue contains the elements to iterate over.
queue *petQueue
// errs is a list because we always want the iterator to drain.
errs []error
// petCount is the number of pets iterated over.
petCount int
}
// Next returns true for as long as there are elements in the underlying queue.
func (pi *petSetIterator) Next() bool {
var pet *pcb
var err error
if pi.petCount < pi.ps.Spec.Replicas {
pet, err = newPCB(fmt.Sprintf("%d", pi.petCount), pi.ps)
if err != nil {
pi.errs = append(pi.errs, err)
// Don't stop iterating over the set on errors. Caller handles nil.
pet = nil
}
pi.queue.enqueue(pet)
pi.petCount++
}
// Keep the iterator running till we've deleted pets in the queue.
return !pi.queue.empty()
}
// Value dequeues an element from the queue.
func (pi *petSetIterator) Value() *pcb {
return pi.queue.dequeue()
}
// NewPetSetIterator returns a new iterator. All pods in the given podList
// are used to seed the queue of the iterator.
func NewPetSetIterator(ps *apps.PetSet, podList []*api.Pod) *petSetIterator {
pi := &petSetIterator{
ps: ps,
queue: NewPetQueue(ps, podList),
errs: []error{},
petCount: 0,
}
return pi
}
// PodsByCreationTimestamp sorts a list of Pods by creation timestamp, using their names as a tie breaker.
type PodsByCreationTimestamp []*api.Pod
func (o PodsByCreationTimestamp) Len() int { return len(o) }
func (o PodsByCreationTimestamp) Swap(i, j int) { o[i], o[j] = o[j], o[i] }
func (o PodsByCreationTimestamp) Less(i, j int) bool {
if o[i].CreationTimestamp.Equal(o[j].CreationTimestamp) {
return o[i].Name < o[j].Name
}
return o[i].CreationTimestamp.Before(o[j].CreationTimestamp)
}

View File

@ -0,0 +1,149 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package petset
import (
"fmt"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/util/sets"
"testing"
)
func TestPetQueueCreates(t *testing.T) {
replicas := 3
ps := newPetSet(replicas)
q := NewPetQueue(ps, []*api.Pod{})
for i := 0; i < replicas; i++ {
pet, _ := newPCB(fmt.Sprintf("%v", i), ps)
q.enqueue(pet)
p := q.dequeue()
if p.event != syncPet {
t.Errorf("Failed to retrieve sync event from queue")
}
}
if q.dequeue() != nil {
t.Errorf("Expected no pets")
}
}
func TestPetQueueScaleDown(t *testing.T) {
replicas := 1
ps := newPetSet(replicas)
// knownPods are the pods in the system
knownPods := newPodList(ps, 3)
q := NewPetQueue(ps, knownPods)
// The iterator will insert a single replica, the enqueue
// mimics that behavior.
pet, _ := newPCB(fmt.Sprintf("%v", 0), ps)
q.enqueue(pet)
deletes := sets.NewString(fmt.Sprintf("%v-1", ps.Name), fmt.Sprintf("%v-2", ps.Name))
syncs := sets.NewString(fmt.Sprintf("%v-0", ps.Name))
// Confirm that 2 known pods are deleted
for i := 0; i < 3; i++ {
p := q.dequeue()
switch p.event {
case syncPet:
if !syncs.Has(p.pod.Name) {
t.Errorf("Unexpected sync %v expecting %+v", p.pod.Name, syncs)
}
case deletePet:
if !deletes.Has(p.pod.Name) {
t.Errorf("Unexpected deletes %v expecting %+v", p.pod.Name, deletes)
}
}
}
if q.dequeue() != nil {
t.Errorf("Expected no pets")
}
}
func TestPetQueueScaleUp(t *testing.T) {
replicas := 5
ps := newPetSet(replicas)
// knownPods are pods in the system
knownPods := newPodList(ps, 2)
q := NewPetQueue(ps, knownPods)
for i := 0; i < 5; i++ {
pet, _ := newPCB(fmt.Sprintf("%v", i), ps)
q.enqueue(pet)
}
for i := 4; i >= 0; i-- {
pet := q.dequeue()
expectedName := fmt.Sprintf("%v-%d", ps.Name, i)
if pet.event != syncPet || pet.pod.Name != expectedName {
t.Errorf("Unexpected pet %+v, expected %v", pet.pod.Name, expectedName)
}
}
}
func TestPetSetIteratorRelist(t *testing.T) {
replicas := 5
ps := newPetSet(replicas)
// knownPods are pods in the system
knownPods := newPodList(ps, 5)
for i := range knownPods {
knownPods[i].Spec.NodeName = fmt.Sprintf("foo-node-%v", i)
knownPods[i].Status.Phase = api.PodRunning
}
pi := NewPetSetIterator(ps, knownPods)
// A simple resync should not change identity of pods in the system
i := 0
for pi.Next() {
p := pi.Value()
if identityHash(ps, p.pod) != identityHash(ps, knownPods[i]) {
t.Errorf("Got unexpected identity hash from iterator.")
}
if p.event != syncPet {
t.Errorf("Got unexpected sync event for %v: %v", p.pod.Name, p.event)
}
i++
}
if i != 5 {
t.Errorf("Unexpected iterations %v, this probably means too many/few pets", i)
}
// Scale to 0 should delete all pods in system
ps.Spec.Replicas = 0
pi = NewPetSetIterator(ps, knownPods)
i = 0
for pi.Next() {
p := pi.Value()
if p.event != deletePet {
t.Errorf("Got unexpected sync event for %v: %v", p.pod.Name, p.event)
}
i++
}
if i != 5 {
t.Errorf("Unexpected iterations %v, this probably means too many/few pets", i)
}
// Relist with 0 replicas should no-op
pi = NewPetSetIterator(ps, []*api.Pod{})
if pi.Next() != false {
t.Errorf("Unexpected iteration without any replicas or pods in system")
}
}

View File

@ -0,0 +1,310 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package petset
import (
"fmt"
"strconv"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/apis/apps"
"k8s.io/kubernetes/pkg/client/record"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/runtime"
"github.com/golang/glog"
)
// petLifeCycleEvent is used to communicate high level actions the controller
// needs to take on a given pet. It's recorded in the pcb. The recognized values
// are listed below.
type petLifeCycleEvent string
const (
syncPet petLifeCycleEvent = "sync"
deletePet petLifeCycleEvent = "delete"
// updateRetries is the number of Get/Update cycles we perform when an
// update fails.
updateRetries = 3
// PetSetInitAnnotation is an annotation which when set, indicates that the
// pet has finished initializing itself.
// TODO: Replace this with init container status.
PetSetInitAnnotation = "pod.alpha.kubernetes.io/initialized"
)
// pcb is the control block used to transmit all updates about a single pet.
// It serves as the manifest for a single pet. Users must populate the pod
// and parent fields to pass it around safely.
type pcb struct {
// pod is the desired pet pod.
pod *api.Pod
// pvcs is a list of desired persistent volume claims for the pet pod.
pvcs []api.PersistentVolumeClaim
// event is the lifecycle event associated with this update.
event petLifeCycleEvent
// id is the identity index of this pet.
id string
// parent is a pointer to the parent petset.
parent *apps.PetSet
}
// pvcClient is a client for managing persistent volume claims.
type pvcClient interface {
// DeletePVCs deletes the pvcs in the given pcb.
DeletePVCs(*pcb) error
// SyncPVCs creates/updates pvcs in the given pcb.
SyncPVCs(*pcb) error
}
// petSyncer syncs a single pet.
type petSyncer struct {
petClient
// blockingPet is an unhealthy pet either from this iteration or a previous
// iteration, either because it is not yet Running, or being Deleted, that
// prevents other creates/deletions.
blockingPet *pcb
}
// Sync syncs the given pet.
func (p *petSyncer) Sync(pet *pcb) error {
if pet == nil {
return nil
}
realPet, exists, err := p.Get(pet)
if err != nil {
return err
}
// There is not constraint except quota on the number of pvcs created.
// This is done per pet so we get a working cluster ASAP, even if user
// runs out of quota.
if err := p.SyncPVCs(pet); err != nil {
return err
}
if exists {
if !p.isHealthy(realPet.pod) {
glog.Infof("PetSet %v waiting on unhealthy pet %v", pet.parent.Name, realPet.pod.Name)
}
return p.Update(realPet, pet)
}
if p.blockingPet != nil {
glog.Infof("Create of %v in PetSet %v blocked by unhealthy pet %v", pet.pod.Name, pet.parent.Name, p.blockingPet.pod.Name)
return nil
}
// This is counted as a create, even if it fails. We can't skip indices
// because some pets might allocate a special role to earlier indices.
// The returned error will force a requeue.
// TODO: What's the desired behavior if pet-0 is deleted while pet-1 is
// not yet healthy? currently pet-0 will wait till pet-1 is healthy,
// this feels safer, but might lead to deadlock.
p.blockingPet = pet
if err := p.Create(pet); err != nil {
return err
}
return nil
}
// Delete deletes the given pet, if no other pet in the petset is blocking a
// scale event.
func (p *petSyncer) Delete(pet *pcb) error {
if pet == nil {
return nil
}
realPet, exists, err := p.Get(pet)
if err != nil {
return err
}
if !exists {
return nil
}
if p.blockingPet != nil {
glog.Infof("Delete of %v in PetSet %v blocked by unhealthy pet %v", realPet.pod.Name, pet.parent.Name, p.blockingPet.pod.Name)
return nil
}
// This is counted as a delete, even if it fails.
// The returned error will force a requeue.
p.blockingPet = realPet
if !p.isDying(realPet.pod) {
glog.Infof("PetSet %v deleting pet %v", pet.parent.Name, pet.pod.Name)
return p.petClient.Delete(pet)
}
glog.Infof("PetSet %v waiting on pet %v to die in %v", pet.parent.Name, realPet.pod.Name, realPet.pod.DeletionTimestamp)
return nil
}
// petClient is a client for managing pets.
type petClient interface {
pvcClient
petHealthChecker
Delete(*pcb) error
Get(*pcb) (*pcb, bool, error)
Create(*pcb) error
Update(*pcb, *pcb) error
}
// apiServerPetClient is a petset aware Kubernetes client.
type apiServerPetClient struct {
c *client.Client
recorder record.EventRecorder
petHealthChecker
}
// Get gets the pet in the pcb from the apiserver.
func (p *apiServerPetClient) Get(pet *pcb) (*pcb, bool, error) {
found := true
ns := pet.parent.Namespace
pod, err := podClient(p.c, ns).Get(pet.pod.Name)
if errors.IsNotFound(err) {
found = false
err = nil
}
if err != nil || !found {
return nil, found, err
}
realPet := *pet
realPet.pod = pod
return &realPet, true, nil
}
// Delete deletes the pet in the pcb from the apiserver.
func (p *apiServerPetClient) Delete(pet *pcb) error {
err := podClient(p.c, pet.parent.Namespace).Delete(pet.pod.Name, nil)
if errors.IsNotFound(err) {
err = nil
}
p.event(pet.parent, "Delete", fmt.Sprintf("pet: %v", pet.pod.Name), err)
return err
}
// Create creates the pet in the pcb.
func (p *apiServerPetClient) Create(pet *pcb) error {
_, err := podClient(p.c, pet.parent.Namespace).Create(pet.pod)
p.event(pet.parent, "Create", fmt.Sprintf("pet: %v", pet.pod.Name), err)
return err
}
// Update updates the pet in the 'pet' pcb to match the pet in the 'expectedPet' pcb.
func (p *apiServerPetClient) Update(pet *pcb, expectedPet *pcb) (updateErr error) {
var getErr error
pc := podClient(p.c, pet.parent.Namespace)
pod, needsUpdate, err := copyPetID(pet, expectedPet)
if err != nil || !needsUpdate {
return err
}
glog.Infof("Resetting pet %v to match PetSet %v spec", pod.Name, pet.parent.Name)
for i, p := 0, &pod; ; i++ {
_, updateErr = pc.Update(p)
if updateErr == nil || i >= updateRetries {
return updateErr
}
if p, getErr = pc.Get(pod.Name); getErr != nil {
return getErr
}
}
}
// DeletePVCs should delete PVCs, when implemented.
func (p *apiServerPetClient) DeletePVCs(pet *pcb) error {
// TODO: Implement this when we delete pvcs.
return nil
}
func (p *apiServerPetClient) getPVC(pvcName, pvcNamespace string) (*api.PersistentVolumeClaim, bool, error) {
found := true
pvc, err := claimClient(p.c, pvcNamespace).Get(pvcName)
if errors.IsNotFound(err) {
found = false
}
if err != nil || !found {
return nil, found, err
}
return pvc, true, nil
}
func (p *apiServerPetClient) createPVC(pvc *api.PersistentVolumeClaim) error {
_, err := claimClient(p.c, pvc.Namespace).Create(pvc)
return err
}
// SyncPVCs syncs pvcs in the given pcb.
func (p *apiServerPetClient) SyncPVCs(pet *pcb) error {
errMsg := ""
// Create new claims.
for i, pvc := range pet.pvcs {
_, exists, err := p.getPVC(pvc.Name, pet.parent.Namespace)
if !exists {
if err := p.createPVC(&pet.pvcs[i]); err != nil {
errMsg += fmt.Sprintf("Failed to create %v: %v", pvc.Name, err)
}
p.event(pet.parent, "Create", fmt.Sprintf("pvc: %v", pvc.Name), err)
} else if err != nil {
errMsg += fmt.Sprintf("Error trying to get pvc %v, %v.", pvc.Name, err)
}
// TODO: Check resource requirements and accessmodes, update if necessary
}
if len(errMsg) != 0 {
return fmt.Errorf("%v", errMsg)
}
return nil
}
// event formats an event for the given runtime object.
func (p *apiServerPetClient) event(obj runtime.Object, reason, msg string, err error) {
if err != nil {
p.recorder.Eventf(obj, api.EventTypeWarning, fmt.Sprintf("Failed%v", reason), fmt.Sprintf("%v, error: %v", msg, err))
} else {
p.recorder.Eventf(obj, api.EventTypeNormal, fmt.Sprintf("Successful%v", reason), msg)
}
}
// petHealthChecker is an interface to check pet health. It makes a boolean
// decision based on the given pod.
type petHealthChecker interface {
isHealthy(*api.Pod) bool
isDying(*api.Pod) bool
}
// defaultPetHealthChecks does basic health checking.
// It doesn't update, probe or get the pod.
type defaultPetHealthChecker struct{}
// isHealthy returns true if the pod is running and has the
// "pod.alpha.kubernetes.io/initialized" set to "true".
func (d *defaultPetHealthChecker) isHealthy(pod *api.Pod) bool {
if pod == nil || pod.Status.Phase != api.PodRunning {
return false
}
initialized, ok := pod.Annotations[PetSetInitAnnotation]
if !ok {
glog.Infof("PetSet pod %v in %v, waiting on annotation %v", api.PodRunning, pod.Name, PetSetInitAnnotation)
return false
}
b, err := strconv.ParseBool(initialized)
if err != nil {
return false
}
return b
}
// isDying returns true if the pod has a non-nil deletion timestamp. Since the
// timestamp can only decrease, once this method returns true for a given pet, it
// will never return false.
func (d *defaultPetHealthChecker) isDying(pod *api.Pod) bool {
return pod != nil && pod.DeletionTimestamp != nil
}

View File

@ -0,0 +1,356 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package petset
import (
"fmt"
"reflect"
"sort"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/apps"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/client/record"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/runtime"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/util/workqueue"
"k8s.io/kubernetes/pkg/watch"
"github.com/golang/glog"
)
const (
// Time to sleep before polling to see if the pod cache has synced.
PodStoreSyncedPollPeriod = 100 * time.Millisecond
// number of retries for a status update.
statusUpdateRetries = 2
// period to relist petsets and verify pets
petSetResyncPeriod = 30 * time.Second
)
// PetSetController controls petsets.
type PetSetController struct {
kubeClient *client.Client
// newSyncer returns an interface capable of syncing a single pet.
// Abstracted out for testing.
newSyncer func(*pcb) *petSyncer
// podStore is a cache of watched pods.
podStore cache.StoreToPodLister
// podStoreSynced returns true if the pod store has synced at least once.
podStoreSynced func() bool
// Watches changes to all pods.
podController framework.ControllerInterface
// A store of PetSets, populated by the psController.
psStore cache.StoreToPetSetLister
// Watches changes to all PetSets.
psController *framework.Controller
// A store of the 1 unhealthy pet blocking progress for a given ps
blockingPetStore *unhealthyPetTracker
// Controllers that need to be synced.
queue *workqueue.Type
// syncHandler handles sync events for petsets.
// Abstracted as a func to allow injection for testing.
syncHandler func(psKey string) []error
}
// NewPetSetController creates a new petset controller.
func NewPetSetController(podInformer framework.SharedInformer, kubeClient *client.Client, resyncPeriod time.Duration) *PetSetController {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(kubeClient.Events(""))
recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "petset"})
pc := &apiServerPetClient{kubeClient, recorder, &defaultPetHealthChecker{}}
psc := &PetSetController{
kubeClient: kubeClient,
blockingPetStore: newUnHealthyPetTracker(pc),
newSyncer: func(blockingPet *pcb) *petSyncer {
return &petSyncer{pc, blockingPet}
},
queue: workqueue.New(),
}
podInformer.AddEventHandler(framework.ResourceEventHandlerFuncs{
// lookup the petset and enqueue
AddFunc: psc.addPod,
// lookup current and old petset if labels changed
UpdateFunc: psc.updatePod,
// lookup petset accounting for deletion tombstones
DeleteFunc: psc.deletePod,
})
psc.podStore.Store = podInformer.GetStore()
psc.podController = podInformer.GetController()
psc.psStore.Store, psc.psController = framework.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return psc.kubeClient.Apps().PetSets(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return psc.kubeClient.Apps().PetSets(api.NamespaceAll).Watch(options)
},
},
&apps.PetSet{},
petSetResyncPeriod,
framework.ResourceEventHandlerFuncs{
AddFunc: psc.enqueuePetSet,
UpdateFunc: func(old, cur interface{}) {
oldPS := old.(*apps.PetSet)
curPS := cur.(*apps.PetSet)
if oldPS.Status.Replicas != curPS.Status.Replicas {
glog.V(4).Infof("Observed updated replica count for PetSet: %v, %d->%d", curPS.Name, oldPS.Status.Replicas, curPS.Status.Replicas)
}
psc.enqueuePetSet(cur)
},
DeleteFunc: psc.enqueuePetSet,
},
)
// TODO: Watch volumes
psc.podStoreSynced = psc.podController.HasSynced
psc.syncHandler = psc.Sync
return psc
}
// Run runs the petset controller.
func (psc *PetSetController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
glog.Infof("Starting petset controller")
go psc.podController.Run(stopCh)
go psc.psController.Run(stopCh)
for i := 0; i < workers; i++ {
go wait.Until(psc.worker, time.Second, stopCh)
}
<-stopCh
glog.Infof("Shutting down petset controller")
psc.queue.ShutDown()
}
// addPod adds the petset for the pod to the sync queue
func (psc *PetSetController) addPod(obj interface{}) {
pod := obj.(*api.Pod)
glog.V(4).Infof("Pod %s created, labels: %+v", pod.Name, pod.Labels)
ps := psc.getPetSetForPod(pod)
if ps == nil {
return
}
psc.enqueuePetSet(ps)
}
// updatePod adds the petset for the current and old pods to the sync queue.
// If the labels of the pod didn't change, this method enqueues a single petset.
func (psc *PetSetController) updatePod(old, cur interface{}) {
if api.Semantic.DeepEqual(old, cur) {
return
}
curPod := cur.(*api.Pod)
oldPod := old.(*api.Pod)
ps := psc.getPetSetForPod(curPod)
if ps == nil {
return
}
psc.enqueuePetSet(ps)
if !reflect.DeepEqual(curPod.Labels, oldPod.Labels) {
if oldPS := psc.getPetSetForPod(oldPod); oldPS != nil {
psc.enqueuePetSet(oldPS)
}
}
}
// deletePod enqueues the petset for the pod accounting for deletion tombstones.
func (psc *PetSetController) deletePod(obj interface{}) {
pod, ok := obj.(*api.Pod)
// When a delete is dropped, the relist will notice a pod in the store not
// in the list, leading to the insertion of a tombstone object which contains
// the deleted key/value. Note that this value might be stale. If the pod
// changed labels the new PetSet will not be woken up till the periodic resync.
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
glog.Errorf("couldn't get object from tombstone %+v", obj)
return
}
pod, ok = tombstone.Obj.(*api.Pod)
if !ok {
glog.Errorf("tombstone contained object that is not a pod %+v", obj)
return
}
}
glog.V(4).Infof("Pod %s/%s deleted through %v.", pod.Namespace, pod.Name, utilruntime.GetCaller())
if ps := psc.getPetSetForPod(pod); ps != nil {
psc.enqueuePetSet(ps)
}
}
// getPodsForPetSets returns the pods that match the selectors of the given petset.
func (psc *PetSetController) getPodsForPetSet(ps *apps.PetSet) ([]*api.Pod, error) {
// TODO: Do we want the petset to fight with RCs? check parent petset annoation, or name prefix?
sel, err := unversioned.LabelSelectorAsSelector(ps.Spec.Selector)
if err != nil {
return []*api.Pod{}, err
}
petList, err := psc.podStore.Pods(ps.Namespace).List(sel)
if err != nil {
return []*api.Pod{}, err
}
pods := []*api.Pod{}
for _, p := range petList.Items {
pods = append(pods, &p)
}
return pods, nil
}
// getPetSetForPod returns the pet set managing the given pod.
func (psc *PetSetController) getPetSetForPod(pod *api.Pod) *apps.PetSet {
ps, err := psc.psStore.GetPodPetSets(pod)
if err != nil {
glog.V(4).Infof("No PetSets found for pod %v, PetSet controller will avoid syncing", pod.Name)
return nil
}
// Resolve a overlapping petset tie by creation timestamp.
// Let's hope users don't create overlapping petsets.
if len(ps) > 1 {
glog.Errorf("user error! more than one PetSet is selecting pods with labels: %+v", pod.Labels)
sort.Sort(overlappingPetSets(ps))
}
return &ps[0]
}
// enqueuePetSet enqueues the given petset in the work queue.
func (psc *PetSetController) enqueuePetSet(obj interface{}) {
key, err := controller.KeyFunc(obj)
if err != nil {
glog.Errorf("Cound't get key for object %+v: %v", obj, err)
return
}
psc.queue.Add(key)
}
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key.
func (psc *PetSetController) worker() {
for {
func() {
key, quit := psc.queue.Get()
if quit {
return
}
defer psc.queue.Done(key)
if errs := psc.syncHandler(key.(string)); len(errs) != 0 {
glog.Errorf("Error syncing PetSet %v, requeuing: %v", key.(string), errs)
psc.queue.Add(key)
}
}()
}
}
// Sync syncs the given petset.
func (psc *PetSetController) Sync(key string) []error {
startTime := time.Now()
defer func() {
glog.V(4).Infof("Finished syncing pet set %q (%v)", key, time.Now().Sub(startTime))
}()
if !psc.podStoreSynced() {
// Sleep so we give the pod reflector goroutine a chance to run.
time.Sleep(PodStoreSyncedPollPeriod)
return []error{fmt.Errorf("waiting for pods controller to sync")}
}
obj, exists, err := psc.psStore.Store.GetByKey(key)
if !exists {
if err = psc.blockingPetStore.store.Delete(key); err != nil {
return []error{err}
}
glog.Infof("PetSet has been deleted %v", key)
return []error{}
}
if err != nil {
glog.Errorf("Unable to retrieve PetSet %v from store: %v", key, err)
return []error{err}
}
ps := *obj.(*apps.PetSet)
petList, err := psc.getPodsForPetSet(&ps)
if err != nil {
return []error{err}
}
numPets, errs := psc.syncPetSet(&ps, petList)
if err := updatePetCount(psc.kubeClient, ps, numPets); err != nil {
glog.Infof("Failed to update replica count for petset %v/%v; requeuing; error: %v", ps.Namespace, ps.Name, err)
errs = append(errs, err)
}
return errs
}
// syncPetSet syncs a tuple of (petset, pets).
func (psc *PetSetController) syncPetSet(ps *apps.PetSet, pets []*api.Pod) (int, []error) {
glog.Infof("Syncing PetSet %v/%v with %d pets", ps.Namespace, ps.Name, len(pets))
it := NewPetSetIterator(ps, pets)
blockingPet, err := psc.blockingPetStore.Get(ps, pets)
if err != nil {
return 0, []error{err}
}
if blockingPet != nil {
glog.Infof("PetSet %v blocked from scaling on pet %v", ps.Name, blockingPet.pod.Name)
}
petManager := psc.newSyncer(blockingPet)
numPets := 0
for it.Next() {
pet := it.Value()
if pet == nil {
continue
}
switch pet.event {
case syncPet:
err = petManager.Sync(pet)
if err == nil {
numPets++
}
case deletePet:
err = petManager.Delete(pet)
}
if err != nil {
it.errs = append(it.errs, err)
}
}
if err := psc.blockingPetStore.Add(petManager.blockingPet); err != nil {
it.errs = append(it.errs, err)
}
// TODO: GC pvcs. We can't delete them per pet because of grace period, and
// in fact we *don't want to* till petset is stable to guarantee that bugs
// in the controller don't corrupt user data.
return numPets, it.errs
}

View File

@ -0,0 +1,264 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package petset
import (
"fmt"
"math/rand"
"reflect"
"testing"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/apis/apps"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/controller"
)
func newFakePetSetController() (*PetSetController, *fakePetClient) {
fpc := newFakePetClient()
return &PetSetController{
kubeClient: nil,
blockingPetStore: newUnHealthyPetTracker(fpc),
podStoreSynced: func() bool { return true },
psStore: cache.StoreToPetSetLister{Store: cache.NewStore(controller.KeyFunc)},
podStore: cache.StoreToPodLister{Store: cache.NewStore(controller.KeyFunc)},
newSyncer: func(blockingPet *pcb) *petSyncer {
return &petSyncer{fpc, blockingPet}
},
}, fpc
}
func checkPets(ps *apps.PetSet, creates, deletes int, fc *fakePetClient, t *testing.T) {
if fc.petsCreated != creates || fc.petsDeleted != deletes {
t.Errorf("Found (creates: %d, deletes: %d), expected (creates: %d, deletes: %d)", fc.petsCreated, fc.petsDeleted, creates, deletes)
}
gotClaims := map[string]api.PersistentVolumeClaim{}
for _, pvc := range fc.claims {
gotClaims[pvc.Name] = pvc
}
for i := range fc.pets {
expectedPet, _ := newPCB(fmt.Sprintf("%v", i), ps)
if identityHash(ps, fc.pets[i].pod) != identityHash(ps, expectedPet.pod) {
t.Errorf("Unexpected pet at index %d", i)
}
for _, pvc := range expectedPet.pvcs {
gotPVC, ok := gotClaims[pvc.Name]
if !ok {
t.Errorf("PVC %v not created for pet %v", pvc.Name, expectedPet.pod.Name)
}
if !reflect.DeepEqual(gotPVC.Spec, pvc.Spec) {
t.Errorf("got PVC %v differs from created pvc", pvc.Name)
}
}
}
}
func scalePetSet(t *testing.T, ps *apps.PetSet, psc *PetSetController, fc *fakePetClient, scale int) []error {
errs := []error{}
for i := 0; i < scale; i++ {
pl := fc.getPodList()
if len(pl) != i {
t.Errorf("Unexpected number of pets, expected %d found %d", i, len(fc.pets))
}
_, syncErrs := psc.syncPetSet(ps, pl)
errs = append(errs, syncErrs...)
fc.setHealthy(i)
checkPets(ps, i+1, 0, fc, t)
}
return errs
}
func saturatePetSet(t *testing.T, ps *apps.PetSet, psc *PetSetController, fc *fakePetClient) {
errs := scalePetSet(t, ps, psc, fc, ps.Spec.Replicas)
if len(errs) != 0 {
t.Errorf("%v", errs)
}
}
func TestPetSetControllerCreates(t *testing.T) {
psc, fc := newFakePetSetController()
replicas := 3
ps := newPetSet(replicas)
saturatePetSet(t, ps, psc, fc)
podList := fc.getPodList()
// Deleted pet gets recreated
fc.pets = fc.pets[:replicas-1]
if _, errs := psc.syncPetSet(ps, podList); len(errs) != 0 {
t.Errorf("%v", errs)
}
checkPets(ps, replicas+1, 0, fc, t)
}
func TestPetSetControllerDeletes(t *testing.T) {
psc, fc := newFakePetSetController()
replicas := 4
ps := newPetSet(replicas)
saturatePetSet(t, ps, psc, fc)
// Drain
errs := []error{}
ps.Spec.Replicas = 0
knownPods := fc.getPodList()
for i := replicas - 1; i >= 0; i-- {
if len(fc.pets) != i+1 {
t.Errorf("Unexpected number of pets, expected %d found %d", i, len(fc.pets))
}
_, syncErrs := psc.syncPetSet(ps, knownPods)
errs = append(errs, syncErrs...)
}
if len(errs) != 0 {
t.Errorf("%v", errs)
}
checkPets(ps, replicas, replicas, fc, t)
}
func TestPetSetControllerRespectsTermination(t *testing.T) {
psc, fc := newFakePetSetController()
replicas := 4
ps := newPetSet(replicas)
saturatePetSet(t, ps, psc, fc)
fc.setDeletionTimestamp(replicas - 1)
ps.Spec.Replicas = 2
_, errs := psc.syncPetSet(ps, fc.getPodList())
if len(errs) != 0 {
t.Errorf("%v", errs)
}
// Finding a pod with the deletion timestamp will pause all deletions.
knownPods := fc.getPodList()
if len(knownPods) != 4 {
t.Errorf("Pods deleted prematurely before deletion timestamp expired, len %d", len(knownPods))
}
fc.pets = fc.pets[:replicas-1]
_, errs = psc.syncPetSet(ps, fc.getPodList())
if len(errs) != 0 {
t.Errorf("%v", errs)
}
checkPets(ps, replicas, 1, fc, t)
}
func TestPetSetControllerRespectsOrder(t *testing.T) {
psc, fc := newFakePetSetController()
replicas := 4
ps := newPetSet(replicas)
saturatePetSet(t, ps, psc, fc)
errs := []error{}
ps.Spec.Replicas = 0
// Shuffle known list and check that pets are deleted in reverse
knownPods := fc.getPodList()
for i := range knownPods {
j := rand.Intn(i + 1)
knownPods[i], knownPods[j] = knownPods[j], knownPods[i]
}
for i := 0; i < replicas; i++ {
if len(fc.pets) != replicas-i {
t.Errorf("Unexpected number of pets, expected %d found %d", i, len(fc.pets))
}
_, syncErrs := psc.syncPetSet(ps, knownPods)
errs = append(errs, syncErrs...)
checkPets(ps, replicas, i+1, fc, t)
}
if len(errs) != 0 {
t.Errorf("%v", errs)
}
}
func TestPetSetControllerBlocksScaling(t *testing.T) {
psc, fc := newFakePetSetController()
replicas := 5
ps := newPetSet(replicas)
scalePetSet(t, ps, psc, fc, 3)
// Create 4th pet, then before flipping it to healthy, kill the first pet.
// There should only be 1 not-healty pet at a time.
pl := fc.getPodList()
if _, errs := psc.syncPetSet(ps, pl); len(errs) != 0 {
t.Errorf("%v", errs)
}
deletedPod := pl[0]
fc.deletePetAtIndex(0)
pl = fc.getPodList()
if _, errs := psc.syncPetSet(ps, pl); len(errs) != 0 {
t.Errorf("%v", errs)
}
newPodList := fc.getPodList()
for _, p := range newPodList {
if p.Name == deletedPod.Name {
t.Errorf("Deleted pod was created while existing pod was unhealthy")
}
}
fc.setHealthy(len(newPodList) - 1)
if _, errs := psc.syncPetSet(ps, pl); len(errs) != 0 {
t.Errorf("%v", errs)
}
found := false
for _, p := range fc.getPodList() {
if p.Name == deletedPod.Name {
found = true
}
}
if !found {
t.Errorf("Deleted pod was not created after existing pods became healthy")
}
}
func TestPetSetBlockingPetIsCleared(t *testing.T) {
psc, fc := newFakePetSetController()
ps := newPetSet(3)
scalePetSet(t, ps, psc, fc, 1)
if blocking, err := psc.blockingPetStore.Get(ps, fc.getPodList()); err != nil || blocking != nil {
t.Errorf("Unexpected blocking pet %v, err %v", blocking, err)
}
// 1 not yet healthy pet
psc.syncPetSet(ps, fc.getPodList())
if blocking, err := psc.blockingPetStore.Get(ps, fc.getPodList()); err != nil || blocking == nil {
t.Errorf("Expected blocking pet %v, err %v", blocking, err)
}
// Deleting the petset should clear the blocking pet
if err := psc.psStore.Store.Delete(ps); err != nil {
t.Fatalf("Unable to delete pet %v from petset controller store.", ps.Name)
}
if errs := psc.Sync(fmt.Sprintf("%v/%v", ps.Namespace, ps.Name)); len(errs) != 0 {
t.Errorf("Error during sync of deleted petset %v", errs)
}
fc.pets = []*pcb{}
fc.petsCreated = 0
if blocking, err := psc.blockingPetStore.Get(ps, fc.getPodList()); err != nil || blocking != nil {
t.Errorf("Unexpected blocking pet %v, err %v", blocking, err)
}
saturatePetSet(t, ps, psc, fc)
// Make sure we don't leak the final blockin pet in the store
psc.syncPetSet(ps, fc.getPodList())
if p, exists, err := psc.blockingPetStore.store.GetByKey(fmt.Sprintf("%v/%v", ps.Namespace, ps.Name)); err != nil || exists {
t.Errorf("Unexpected blocking pet, err %v: %+v", err, p)
}
}

View File

@ -0,0 +1,168 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package petset
import (
"fmt"
"sync"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/apis/apps"
"k8s.io/kubernetes/pkg/client/cache"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/controller"
"github.com/golang/glog"
)
// overlappingPetSets sorts a list of PetSets by creation timestamp, using their names as a tie breaker.
// Generally used to tie break between PetSets that have overlapping selectors.
type overlappingPetSets []apps.PetSet
func (o overlappingPetSets) Len() int { return len(o) }
func (o overlappingPetSets) Swap(i, j int) { o[i], o[j] = o[j], o[i] }
func (o overlappingPetSets) Less(i, j int) bool {
if o[i].CreationTimestamp.Equal(o[j].CreationTimestamp) {
return o[i].Name < o[j].Name
}
return o[i].CreationTimestamp.Before(o[j].CreationTimestamp)
}
// updatePetCount attempts to update the Status.Replicas of the given PetSet, with a single GET/PUT retry.
func updatePetCount(kubeClient *client.Client, ps apps.PetSet, numPets int) (updateErr error) {
if ps.Status.Replicas == numPets || kubeClient == nil {
return nil
}
psClient := kubeClient.Apps().PetSets(ps.Namespace)
var getErr error
for i, ps := 0, &ps; ; i++ {
glog.V(4).Infof(fmt.Sprintf("Updating replica count for PetSet: %s/%s, ", ps.Namespace, ps.Name) +
fmt.Sprintf("replicas %d->%d (need %d), ", ps.Status.Replicas, numPets, ps.Spec.Replicas))
ps.Status = apps.PetSetStatus{Replicas: numPets}
_, updateErr = psClient.UpdateStatus(ps)
if updateErr == nil || i >= statusUpdateRetries {
return updateErr
}
if ps, getErr = psClient.Get(ps.Name); getErr != nil {
return getErr
}
}
}
// claimClient returns the pvcClient for the given kubeClient/ns.
func claimClient(kubeClient *client.Client, ns string) client.PersistentVolumeClaimInterface {
return kubeClient.PersistentVolumeClaims(ns)
}
// podClient returns the given podClient for the given kubeClient/ns.
func podClient(kubeClient *client.Client, ns string) client.PodInterface {
return kubeClient.Pods(ns)
}
// unhealthyPetTracker tracks unhealthy pets for petsets.
type unhealthyPetTracker struct {
pc petClient
store cache.Store
storeLock sync.Mutex
}
// Get returns a previously recorded blocking pet for the given petset.
func (u *unhealthyPetTracker) Get(ps *apps.PetSet, knownPets []*api.Pod) (*pcb, error) {
u.storeLock.Lock()
defer u.storeLock.Unlock()
// We "Get" by key but "Add" by object because the store interface doesn't
// allow us to Get/Add a related obj (eg petset: blocking pet).
key, err := controller.KeyFunc(ps)
if err != nil {
return nil, err
}
obj, exists, err := u.store.GetByKey(key)
if err != nil {
return nil, err
}
hc := defaultPetHealthChecker{}
// There's no unhealthy pet blocking a scale event, but this might be
// a controller manager restart. If it is, knownPets can be trusted.
if !exists {
for _, p := range knownPets {
if hc.isHealthy(p) && !hc.isDying(p) {
glog.V(4).Infof("Ignoring healthy pet %v for PetSet %v", p.Name, ps.Name)
continue
}
glog.Infof("No recorded blocking pet, but found unhealty pet %v for PetSet %v", p.Name, ps.Name)
return &pcb{pod: p, parent: ps}, nil
}
return nil, nil
}
// This is a pet that's blocking further creates/deletes of a petset. If it
// disappears, it's no longer blocking. If it exists, it continues to block
// till it turns healthy or disappears.
bp := obj.(*pcb)
blockingPet, exists, err := u.pc.Get(bp)
if err != nil {
return nil, err
}
if !exists {
glog.V(4).Infof("Clearing blocking pet %v for PetSet %v because it's been deleted", bp.pod.Name, ps.Name)
return nil, nil
}
blockingPetPod := blockingPet.pod
if hc.isHealthy(blockingPetPod) && !hc.isDying(blockingPetPod) {
glog.V(4).Infof("Clearing blocking pet %v for PetSet %v because it's healthy", bp.pod.Name, ps.Name)
u.store.Delete(blockingPet)
blockingPet = nil
}
return blockingPet, nil
}
// Add records the given pet as a blocking pet.
func (u *unhealthyPetTracker) Add(blockingPet *pcb) error {
u.storeLock.Lock()
defer u.storeLock.Unlock()
if blockingPet == nil {
return nil
}
glog.V(4).Infof("Adding blocking pet %v for PetSet %v", blockingPet.pod.Name, blockingPet.parent.Name)
return u.store.Add(blockingPet)
}
// newUnHealthyPetTracker tracks unhealthy pets that block progress of petsets.
func newUnHealthyPetTracker(pc petClient) *unhealthyPetTracker {
return &unhealthyPetTracker{pc: pc, store: cache.NewStore(pcbKeyFunc)}
}
// pcbKeyFunc computes the key for a given pcb.
// If it's given a key, it simply returns it.
func pcbKeyFunc(obj interface{}) (string, error) {
if key, ok := obj.(string); ok {
return key, nil
}
p, ok := obj.(*pcb)
if !ok {
return "", fmt.Errorf("not a valid pet control block %+v", p)
}
if p.parent == nil {
return "", fmt.Errorf("cannot compute pet control block key without parent pointer %+v", p)
}
return controller.KeyFunc(p.parent)
}

View File

@ -33,6 +33,7 @@ import (
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/apps"
"k8s.io/kubernetes/pkg/apis/autoscaling"
"k8s.io/kubernetes/pkg/apis/batch"
"k8s.io/kubernetes/pkg/apis/extensions"
@ -99,6 +100,7 @@ func describerMap(c *client.Client) map[unversioned.GroupKind]Describer {
extensions.Kind("Deployment"): &DeploymentDescriber{adapter.FromUnversionedClient(c)},
extensions.Kind("Job"): &JobDescriber{c},
batch.Kind("Job"): &JobDescriber{c},
apps.Kind("PetSet"): &PetSetDescriber{c},
extensions.Kind("Ingress"): &IngressDescriber{c},
}
@ -1630,6 +1632,46 @@ func describeNode(node *api.Node, nodeNonTerminatedPodsList *api.PodList, events
})
}
type PetSetDescriber struct {
client *client.Client
}
func (p *PetSetDescriber) Describe(namespace, name string) (string, error) {
ps, err := p.client.Apps().PetSets(namespace).Get(name)
if err != nil {
return "", err
}
pc := p.client.Pods(namespace)
selector, err := unversioned.LabelSelectorAsSelector(ps.Spec.Selector)
if err != nil {
return "", err
}
running, waiting, succeeded, failed, err := getPodStatusForController(pc, selector)
if err != nil {
return "", err
}
return tabbedString(func(out io.Writer) error {
fmt.Fprintf(out, "Name:\t%s\n", ps.Name)
fmt.Fprintf(out, "Namespace:\t%s\n", ps.Namespace)
fmt.Fprintf(out, "Image(s):\t%s\n", makeImageList(&ps.Spec.Template.Spec))
fmt.Fprintf(out, "Selector:\t%s\n", unversioned.FormatLabelSelector(ps.Spec.Selector))
fmt.Fprintf(out, "Labels:\t%s\n", labels.FormatLabels(ps.Labels))
fmt.Fprintf(out, "Replicas:\t%d current / %d desired\n", ps.Status.Replicas, ps.Spec.Replicas)
fmt.Fprintf(out, "Annotations:\t%s\n", labels.FormatLabels(ps.Annotations))
fmt.Fprintf(out, "CreationTimestamp:\t%s\n", ps.CreationTimestamp.Time.Format(time.RFC1123Z))
fmt.Fprintf(out, "Pods Status:\t%d Running / %d Waiting / %d Succeeded / %d Failed\n", running, waiting, succeeded, failed)
describeVolumes(ps.Spec.Template.Spec.Volumes, out, "")
events, _ := p.client.Events(namespace).Search(ps)
if events != nil {
DescribeEvents(events, out)
}
return nil
})
}
// HorizontalPodAutoscalerDescriber generates information about a horizontal pod autoscaler.
type HorizontalPodAutoscalerDescriber struct {
client *client.Client

View File

@ -86,13 +86,9 @@ func (petSetStrategy) AllowCreateOnUpdate() bool {
// ValidateUpdate is the default update validation for an end user.
func (petSetStrategy) ValidateUpdate(ctx api.Context, obj, old runtime.Object) field.ErrorList {
return field.ErrorList{field.Forbidden(field.NewPath("spec"), "updates to petset spec are forbidden.")}
// TODO: For now we're taking the safe route and disallowing all updates to spec.
// Enable on a case by case basis.
//validationErrorList := validation.ValidatePetSet(obj.(*apps.PetSet))
//updateErrorList := validation.ValidatePetSetUpdate(obj.(*apps.PetSet), old.(*apps.PetSet))
//return append(validationErrorList, updateErrorList...)
validationErrorList := validation.ValidatePetSet(obj.(*apps.PetSet))
updateErrorList := validation.ValidatePetSetUpdate(obj.(*apps.PetSet), old.(*apps.PetSet))
return append(validationErrorList, updateErrorList...)
}
// AllowUnconditionalUpdate is the default update policy for PetSet objects.

View File

@ -64,6 +64,7 @@ func TestPetSetStrategy(t *testing.T) {
t.Errorf("Unexpected error validating %v", errs)
}
// Just Spec.Replicas is allowed to change
validPs := &apps.PetSet{
ObjectMeta: api.ObjectMeta{Name: ps.Name, Namespace: ps.Namespace},
Spec: apps.PetSetSpec{
@ -74,6 +75,13 @@ func TestPetSetStrategy(t *testing.T) {
}
Strategy.PrepareForUpdate(validPs, ps)
errs = Strategy.ValidateUpdate(ctx, validPs, ps)
if len(errs) != 0 {
t.Errorf("Updating spec.Replicas is allowed on a petset.")
}
validPs.Spec.Selector = &unversioned.LabelSelector{MatchLabels: map[string]string{"a": "bar"}}
Strategy.PrepareForUpdate(validPs, ps)
errs = Strategy.ValidateUpdate(ctx, validPs, ps)
if len(errs) == 0 {
t.Errorf("Expected a validation error since updates are disallowed on petsets.")
}

View File

@ -3743,3 +3743,21 @@ func CoreDump(dir string) {
Logf("Error running cluster/log-dump.sh: %v", err)
}
}
func UpdatePodWithRetries(client *client.Client, ns, name string, update func(*api.Pod)) (*api.Pod, error) {
for i := 0; i < 3; i++ {
pod, err := client.Pods(ns).Get(name)
if err != nil {
return nil, fmt.Errorf("Failed to get pod %q: %v", name, err)
}
update(pod)
pod, err = client.Pods(ns).Update(pod)
if err == nil {
return pod, nil
}
if !apierrs.IsConflict(err) && !apierrs.IsServerTimeout(err) {
return nil, fmt.Errorf("Failed to update pod %q: %v", name, err)
}
}
return nil, fmt.Errorf("Too many retries updating Pod %q", name)
}

378
test/e2e/petset.go Normal file
View File

@ -0,0 +1,378 @@
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package e2e
import (
"fmt"
"strconv"
"time"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"k8s.io/kubernetes/pkg/api"
apierrs "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/apps"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/controller/petset"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/test/e2e/framework"
"speter.net/go/exp/math/dec/inf"
)
const (
petsetPoll = 10 * time.Second
petsetTimeout = 5 * time.Minute
)
var _ = framework.KubeDescribe("PetSet", func() {
f := framework.NewDefaultFramework("petset")
psName := "pet"
labels := map[string]string{
"foo": "bar",
"baz": "blah",
}
headlessSvcName := "test"
var ns string
var c *client.Client
BeforeEach(func() {
var err error
c, err = framework.LoadClient()
Expect(err).NotTo(HaveOccurred())
ns = f.Namespace.Name
By("creating service " + headlessSvcName + " in namespace " + ns)
headlessService := createServiceSpec(headlessSvcName, true, labels)
_, err = c.Services(ns).Create(headlessService)
Expect(err).NotTo(HaveOccurred())
})
It("provide basic identity [Feature:PetSet]", func() {
By("creating petset " + psName + " in namespace " + ns)
defer func() {
err := c.Apps().PetSets(ns).Delete(psName, nil)
Expect(err).NotTo(HaveOccurred())
}()
petMounts := []api.VolumeMount{{Name: "datadir", MountPath: "/data/"}}
podMounts := []api.VolumeMount{{Name: "home", MountPath: "/home"}}
ps := newPetSet(psName, ns, headlessSvcName, 3, petMounts, podMounts, labels)
_, err := c.Apps().PetSets(ns).Create(ps)
Expect(err).NotTo(HaveOccurred())
pt := petTester{c: c}
By("Saturating pet set " + ps.Name)
pt.saturate(ps)
cmd := "echo $(hostname) > /data/hostname"
By("Running " + cmd + " in all pets")
pt.execInPets(ps, cmd)
By("Restarting pet set " + ps.Name)
pt.restart(ps)
pt.saturate(ps)
cmd = "if [ \"$(cat /data/hostname)\" = \"$(hostname)\" ]; then exit 0; else exit 1; fi"
By("Running " + cmd + " in all pets")
pt.execInPets(ps, cmd)
})
It("should handle healthy pet restarts during scale [Feature:PetSet]", func() {
By("creating petset " + psName + " in namespace " + ns)
defer func() {
err := c.Apps().PetSets(ns).Delete(psName, nil)
Expect(err).NotTo(HaveOccurred())
}()
petMounts := []api.VolumeMount{{Name: "datadir", MountPath: "/data/"}}
podMounts := []api.VolumeMount{{Name: "home", MountPath: "/home"}}
ps := newPetSet(psName, ns, headlessSvcName, 2, petMounts, podMounts, labels)
_, err := c.Apps().PetSets(ns).Create(ps)
Expect(err).NotTo(HaveOccurred())
pt := petTester{c: c}
pt.waitForRunning(1, ps)
By("Marking pet at index 0 as healthy.")
pt.setHealthy(ps)
By("Waiting for pet at index 1 to enter running.")
pt.waitForRunning(2, ps)
// Now we have 1 healthy and 1 unhealthy pet. Deleting the healthy pet should *not*
// create a new pet till the remaining pet becomes healthy, which won't happen till
// we set the healthy bit.
By("Deleting healthy pet at index 0.")
pt.deletePetAtIndex(0, ps)
By("Confirming pet at index 0 is not recreated.")
pt.confirmPetCount(1, ps, 10*time.Second)
By("Deleting unhealthy pet at index 1.")
pt.deletePetAtIndex(1, ps)
By("Confirming all pets in petset are created.")
pt.saturate(ps)
})
})
type petTester struct {
c *client.Client
}
func (p *petTester) execInPets(ps *apps.PetSet, cmd string) {
podList := p.getPodList(ps)
for _, pet := range podList.Items {
stdout, err := framework.RunHostCmd(pet.Namespace, pet.Name, cmd)
ExpectNoError(err)
framework.Logf("stdout %v on %v: %v", cmd, pet.Name, stdout)
}
}
func (p *petTester) saturate(ps *apps.PetSet) {
// TOOD: Watch events and check that creation timestamps don't overlap
for i := 0; i < ps.Spec.Replicas; i++ {
framework.Logf("Waiting for pet at index " + fmt.Sprintf("%v", i+1) + " to enter Running")
p.waitForRunning(i+1, ps)
framework.Logf("Marking pet at index " + fmt.Sprintf("%v", i) + " healthy")
p.setHealthy(ps)
}
}
func (p *petTester) deletePetAtIndex(index int, ps *apps.PetSet) {
// TODO: we won't use "-index" as the name strategy forever,
// pull the name out from an identity mapper.
name := fmt.Sprintf("%v-%v", ps.Name, index)
noGrace := int64(0)
if err := p.c.Pods(ps.Namespace).Delete(name, &api.DeleteOptions{GracePeriodSeconds: &noGrace}); err != nil {
framework.Failf("Failed to delete pet %v for PetSet %v: %v", name, ps.Name, ps.Namespace, err)
}
}
func (p *petTester) restart(ps *apps.PetSet) {
name := ps.Name
ns := ps.Namespace
oldReplicas := ps.Spec.Replicas
p.update(ns, name, func(ps *apps.PetSet) { ps.Spec.Replicas = 0 })
var petList *api.PodList
pollErr := wait.PollImmediate(petsetPoll, petsetTimeout, func() (bool, error) {
petList = p.getPodList(ps)
if len(petList.Items) == 0 {
return true, nil
}
return false, nil
})
if pollErr != nil {
ts := []string{}
for _, pet := range petList.Items {
if pet.DeletionTimestamp != nil {
ts = append(ts, fmt.Sprintf("%v", pet.DeletionTimestamp.Time))
}
}
framework.Failf("Failed to scale petset down to 0, %d remaining pods with deletion timestamps: %v", len(petList.Items), ts)
}
p.update(ns, name, func(ps *apps.PetSet) { ps.Spec.Replicas = oldReplicas })
}
func (p *petTester) update(ns, name string, update func(ps *apps.PetSet)) {
for i := 0; i < 3; i++ {
ps, err := p.c.Apps().PetSets(ns).Get(name)
if err != nil {
framework.Failf("failed to get petset %q: %v", name, err)
}
update(ps)
ps, err = p.c.Apps().PetSets(ns).Update(ps)
if err == nil {
return
}
if !apierrs.IsConflict(err) && !apierrs.IsServerTimeout(err) {
framework.Failf("failed to update petset %q: %v", name, err)
}
}
framework.Failf("too many retries draining petset %q", name)
}
func (p *petTester) getPodList(ps *apps.PetSet) *api.PodList {
selector, err := unversioned.LabelSelectorAsSelector(ps.Spec.Selector)
ExpectNoError(err)
podList, err := p.c.Pods(ps.Namespace).List(api.ListOptions{LabelSelector: selector})
ExpectNoError(err)
return podList
}
func ExpectNoError(err error) {
Expect(err).NotTo(HaveOccurred())
}
func (p *petTester) confirmPetCount(count int, ps *apps.PetSet, timeout time.Duration) {
start := time.Now()
deadline := start.Add(timeout)
for t := time.Now(); t.Before(deadline); t = time.Now() {
podList := p.getPodList(ps)
petCount := len(podList.Items)
if petCount != count {
framework.Failf("PetSet %v scaled unexpectedly scaled to %d -> %d replicas: %+v", ps.Name, count, len(podList.Items), podList)
}
framework.Logf("Verifying petset %v doesn't scale past %d for another %+v", ps.Name, count, deadline.Sub(t))
time.Sleep(1 * time.Second)
}
}
func (p *petTester) waitForRunning(numPets int, ps *apps.PetSet) {
pollErr := wait.PollImmediate(petsetPoll, petsetTimeout,
func() (bool, error) {
podList := p.getPodList(ps)
if len(podList.Items) < numPets {
framework.Logf("Found %d pets, waiting for %d", len(podList.Items), numPets)
return false, nil
}
if len(podList.Items) > numPets {
return false, fmt.Errorf("Too many pods scheduled, expected %d got %d", numPods, len(podList.Items))
}
for _, p := range podList.Items {
if p.Status.Phase != api.PodRunning {
framework.Logf("Waiting for pod %v to enter %v, currently %v", p.Name, api.PodRunning, p.Status.Phase)
return false, nil
}
}
return true, nil
})
if pollErr != nil {
framework.Failf("Failed waiting for pods to enter running: %v", pollErr)
}
}
func (p *petTester) setHealthy(ps *apps.PetSet) {
podList := p.getPodList(ps)
markedHealthyPod := ""
for _, pod := range podList.Items {
if pod.Status.Phase != api.PodRunning {
framework.Failf("Found pod in %v cannot set health", pod.Status.Phase)
}
if isInitialized(pod) {
continue
}
if markedHealthyPod != "" {
framework.Failf("Found multiple non-healthy pets: %v and %v", pod.Name, markedHealthyPod)
}
p, err := framework.UpdatePodWithRetries(p.c, pod.Namespace, pod.Name, func(up *api.Pod) {
up.Annotations[petset.PetSetInitAnnotation] = "true"
})
ExpectNoError(err)
framework.Logf("Set annotation %v to %v on pod %v", petset.PetSetInitAnnotation, p.Annotations[petset.PetSetInitAnnotation], pod.Name)
markedHealthyPod = pod.Name
}
}
func isInitialized(pod api.Pod) bool {
initialized, ok := pod.Annotations[petset.PetSetInitAnnotation]
if !ok {
return false
}
inited, err := strconv.ParseBool(initialized)
if err != nil {
framework.Failf("Couldn't parse petset init annotations %v", initialized)
}
return inited
}
func dec(i int64, exponent int) *inf.Dec {
return inf.NewDec(i, inf.Scale(-exponent))
}
func newPVC(name string) api.PersistentVolumeClaim {
return api.PersistentVolumeClaim{
ObjectMeta: api.ObjectMeta{
Name: name,
Annotations: map[string]string{
"volume.alpha.kubernetes.io/storage-class": "anything",
},
},
Spec: api.PersistentVolumeClaimSpec{
AccessModes: []api.PersistentVolumeAccessMode{
api.ReadWriteOnce,
},
Resources: api.ResourceRequirements{
Requests: api.ResourceList{
api.ResourceStorage: resource.Quantity{
Amount: dec(1, 0),
Format: resource.BinarySI,
},
},
},
},
}
}
func newPetSet(name, ns, governingSvcName string, replicas int, petMounts []api.VolumeMount, podMounts []api.VolumeMount, labels map[string]string) *apps.PetSet {
mounts := append(petMounts, podMounts...)
claims := []api.PersistentVolumeClaim{}
for _, m := range petMounts {
claims = append(claims, newPVC(m.Name))
}
vols := []api.Volume{}
for _, m := range podMounts {
vols = append(vols, api.Volume{
Name: m.Name,
VolumeSource: api.VolumeSource{
HostPath: &api.HostPathVolumeSource{
Path: fmt.Sprintf("/tmp/%v", m.Name),
},
},
})
}
return &apps.PetSet{
TypeMeta: unversioned.TypeMeta{
Kind: "PetSet",
APIVersion: "apps/v1beta1",
},
ObjectMeta: api.ObjectMeta{
Name: name,
Namespace: ns,
},
Spec: apps.PetSetSpec{
Selector: &unversioned.LabelSelector{
MatchLabels: labels,
},
Replicas: replicas,
Template: api.PodTemplateSpec{
ObjectMeta: api.ObjectMeta{
Labels: labels,
},
Spec: api.PodSpec{
Containers: []api.Container{
{
Name: "nginx",
Image: "gcr.io/google_containers/nginx-slim:0.5",
VolumeMounts: mounts,
},
},
Volumes: vols,
},
},
VolumeClaimTemplates: claims,
ServiceName: governingSvcName,
},
}
}