Merge pull request #13874 from timstclair/status-manager

Auto commit by PR queue bot
pull/6/head
k8s-merge-robot 2015-09-16 00:46:01 -07:00
commit f6fb0a58b6
5 changed files with 84 additions and 58 deletions

View File

@ -52,6 +52,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/metrics"
"k8s.io/kubernetes/pkg/kubelet/network"
"k8s.io/kubernetes/pkg/kubelet/rkt"
"k8s.io/kubernetes/pkg/kubelet/status"
kubeletTypes "k8s.io/kubernetes/pkg/kubelet/types"
kubeletUtil "k8s.io/kubernetes/pkg/kubelet/util"
"k8s.io/kubernetes/pkg/labels"
@ -241,7 +242,7 @@ func NewMainKubelet(
if err != nil {
return nil, fmt.Errorf("failed to initialize disk manager: %v", err)
}
statusManager := newStatusManager(kubeClient)
statusManager := status.NewManager(kubeClient)
readinessManager := kubecontainer.NewReadinessManager()
containerRefManager := kubecontainer.NewRefManager()
@ -501,7 +502,7 @@ type Kubelet struct {
machineInfo *cadvisorApi.MachineInfo
// Syncs pods statuses with apiserver; also used as a cache of statuses.
statusManager *statusManager
statusManager status.Manager
// Manager for the volume maps for the pods.
volumeManager *volumeManager

View File

@ -44,6 +44,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/container"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/network"
"k8s.io/kubernetes/pkg/kubelet/status"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util"
@ -105,7 +106,7 @@ func newTestKubelet(t *testing.T) *TestKubelet {
kubelet.nodeLister = testNodeLister{}
kubelet.readinessManager = kubecontainer.NewReadinessManager()
kubelet.recorder = fakeRecorder
kubelet.statusManager = newStatusManager(fakeKubeClient)
kubelet.statusManager = status.NewManager(fakeKubeClient)
if err := kubelet.setupDataDirs(); err != nil {
t.Fatalf("can't initialize kubelet data dirs: %v", err)
}

View File

@ -30,6 +30,7 @@ import (
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/dockertools"
"k8s.io/kubernetes/pkg/kubelet/network"
"k8s.io/kubernetes/pkg/kubelet/status"
)
type listContainersResult struct {
@ -83,7 +84,7 @@ func TestRunOnce(t *testing.T) {
recorder: &record.FakeRecorder{},
cadvisor: cadvisor,
nodeLister: testNodeLister{},
statusManager: newStatusManager(nil),
statusManager: status.NewManager(nil),
containerRefManager: kubecontainer.NewRefManager(),
readinessManager: kubecontainer.NewReadinessManager(),
podManager: podManager,

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet
package status
import (
"fmt"
@ -39,16 +39,42 @@ type podStatusSyncRequest struct {
// Updates pod statuses in apiserver. Writes only when new status has changed.
// All methods are thread-safe.
type statusManager struct {
type manager struct {
kubeClient client.Interface
// Map from pod full name to sync status of the corresponding pod.
podStatusesLock sync.RWMutex
podStatuses map[types.UID]api.PodStatus
podStatusesLock sync.RWMutex
podStatusChannel chan podStatusSyncRequest
}
func newStatusManager(kubeClient client.Interface) *statusManager {
return &statusManager{
// status.Manager is the Source of truth for kubelet pod status, and should be kept up-to-date with
// the latest api.PodStatus. It also syncs updates back to the API server.
type Manager interface {
// Start the API server status sync loop.
Start()
// GetPodStatus returns the cached status for the provided pod UID, as well as whether it
// was a cache hit.
GetPodStatus(uid types.UID) (api.PodStatus, bool)
// SetPodStatus caches updates the cached status for the given pod, and triggers a status update.
SetPodStatus(pod *api.Pod, status api.PodStatus)
// TerminatePods resets the container status for the provided pods to terminated and triggers
// a status update. This function may not enqueue all the provided pods, in which case it will
// return false
TerminatePods(pods []*api.Pod) bool
// DeletePodStatus simply removes the given pod from the status cache.
DeletePodStatus(uid types.UID)
// RemoveOrphanedStatuses scans the status cache and removes any entries for pods not included in
// the provided podUIDs.
RemoveOrphanedStatuses(podUIDs map[types.UID]bool)
}
func NewManager(kubeClient client.Interface) Manager {
return &manager{
kubeClient: kubeClient,
podStatuses: make(map[types.UID]api.PodStatus),
podStatusChannel: make(chan podStatusSyncRequest, 1000), // Buffer up to 1000 statuses
@ -65,35 +91,35 @@ func isStatusEqual(oldStatus, status *api.PodStatus) bool {
return reflect.DeepEqual(status, oldStatus)
}
func (s *statusManager) Start() {
func (m *manager) Start() {
// Don't start the status manager if we don't have a client. This will happen
// on the master, where the kubelet is responsible for bootstrapping the pods
// of the master components.
if s.kubeClient == nil {
if m.kubeClient == nil {
glog.Infof("Kubernetes client is nil, not starting status manager.")
return
}
// syncBatch blocks when no updates are available, we can run it in a tight loop.
glog.Info("Starting to sync pod status with apiserver")
go util.Until(func() {
err := s.syncBatch()
err := m.syncBatch()
if err != nil {
glog.Warningf("Failed to updated pod status: %v", err)
}
}, 0, util.NeverStop)
}
func (s *statusManager) GetPodStatus(uid types.UID) (api.PodStatus, bool) {
s.podStatusesLock.RLock()
defer s.podStatusesLock.RUnlock()
status, ok := s.podStatuses[uid]
func (m *manager) GetPodStatus(uid types.UID) (api.PodStatus, bool) {
m.podStatusesLock.RLock()
defer m.podStatusesLock.RUnlock()
status, ok := m.podStatuses[uid]
return status, ok
}
func (s *statusManager) SetPodStatus(pod *api.Pod, status api.PodStatus) {
s.podStatusesLock.Lock()
defer s.podStatusesLock.Unlock()
oldStatus, found := s.podStatuses[pod.UID]
func (m *manager) SetPodStatus(pod *api.Pod, status api.PodStatus) {
m.podStatusesLock.Lock()
defer m.podStatusesLock.Unlock()
oldStatus, found := m.podStatuses[pod.UID]
// ensure that the start time does not change across updates.
if found && oldStatus.StartTime != nil {
@ -102,7 +128,7 @@ func (s *statusManager) SetPodStatus(pod *api.Pod, status api.PodStatus) {
// if the status has no start time, we need to set an initial time
// TODO(yujuhong): Consider setting StartTime when generating the pod
// status instead, which would allow statusManager to become a simple cache
// status instead, which would allow manager to become a simple cache
// again.
if status.StartTime.IsZero() {
if pod.Status.StartTime.IsZero() {
@ -123,20 +149,17 @@ func (s *statusManager) SetPodStatus(pod *api.Pod, status api.PodStatus) {
// workers and/or the kubelet but dropping the lock before sending the
// status down the channel feels like an easy way to get a bullet in foot.
if !found || !isStatusEqual(&oldStatus, &status) || pod.DeletionTimestamp != nil {
s.podStatuses[pod.UID] = status
s.podStatusChannel <- podStatusSyncRequest{pod, status}
m.podStatuses[pod.UID] = status
m.podStatusChannel <- podStatusSyncRequest{pod, status}
} else {
glog.V(3).Infof("Ignoring same status for pod %q, status: %+v", kubeletUtil.FormatPodName(pod), status)
}
}
// TerminatePods resets the container status for the provided pods to terminated and triggers
// a status update. This function may not enqueue all the provided pods, in which case it will
// return false
func (s *statusManager) TerminatePods(pods []*api.Pod) bool {
func (m *manager) TerminatePods(pods []*api.Pod) bool {
sent := true
s.podStatusesLock.Lock()
defer s.podStatusesLock.Unlock()
m.podStatusesLock.Lock()
defer m.podStatusesLock.Unlock()
for _, pod := range pods {
for i := range pod.Status.ContainerStatuses {
pod.Status.ContainerStatuses[i].State = api.ContainerState{
@ -144,7 +167,7 @@ func (s *statusManager) TerminatePods(pods []*api.Pod) bool {
}
}
select {
case s.podStatusChannel <- podStatusSyncRequest{pod, pod.Status}:
case m.podStatusChannel <- podStatusSyncRequest{pod, pod.Status}:
default:
sent = false
glog.V(4).Infof("Termination notice for %q was dropped because the status channel is full", kubeletUtil.FormatPodName(pod))
@ -153,27 +176,27 @@ func (s *statusManager) TerminatePods(pods []*api.Pod) bool {
return sent
}
func (s *statusManager) DeletePodStatus(uid types.UID) {
s.podStatusesLock.Lock()
defer s.podStatusesLock.Unlock()
delete(s.podStatuses, uid)
func (m *manager) DeletePodStatus(uid types.UID) {
m.podStatusesLock.Lock()
defer m.podStatusesLock.Unlock()
delete(m.podStatuses, uid)
}
// TODO(filipg): It'd be cleaner if we can do this without signal from user.
func (s *statusManager) RemoveOrphanedStatuses(podUIDs map[types.UID]bool) {
s.podStatusesLock.Lock()
defer s.podStatusesLock.Unlock()
for key := range s.podStatuses {
func (m *manager) RemoveOrphanedStatuses(podUIDs map[types.UID]bool) {
m.podStatusesLock.Lock()
defer m.podStatusesLock.Unlock()
for key := range m.podStatuses {
if _, ok := podUIDs[key]; !ok {
glog.V(5).Infof("Removing %q from status map.", key)
delete(s.podStatuses, key)
delete(m.podStatuses, key)
}
}
}
// syncBatch syncs pods statuses with the apiserver.
func (s *statusManager) syncBatch() error {
syncRequest := <-s.podStatusChannel
func (m *manager) syncBatch() error {
syncRequest := <-m.podStatusChannel
pod := syncRequest.pod
status := syncRequest.status
@ -182,7 +205,7 @@ func (s *statusManager) syncBatch() error {
ObjectMeta: pod.ObjectMeta,
}
// TODO: make me easier to express from client code
statusPod, err = s.kubeClient.Pods(statusPod.Namespace).Get(statusPod.Name)
statusPod, err = m.kubeClient.Pods(statusPod.Namespace).Get(statusPod.Name)
if errors.IsNotFound(err) {
glog.V(3).Infof("Pod %q was deleted on the server", pod.Name)
return nil
@ -194,7 +217,7 @@ func (s *statusManager) syncBatch() error {
}
statusPod.Status = status
// TODO: handle conflict as a retry, make that easier too.
statusPod, err = s.kubeClient.Pods(pod.Namespace).UpdateStatus(statusPod)
statusPod, err = m.kubeClient.Pods(pod.Namespace).UpdateStatus(statusPod)
if err == nil {
glog.V(3).Infof("Status for pod %q updated successfully", kubeletUtil.FormatPodName(pod))
@ -205,9 +228,9 @@ func (s *statusManager) syncBatch() error {
glog.V(3).Infof("Pod %q is terminated, but some pods are still running", pod.Name)
return nil
}
if err := s.kubeClient.Pods(statusPod.Namespace).Delete(statusPod.Name, api.NewDeleteOptions(0)); err == nil {
if err := m.kubeClient.Pods(statusPod.Namespace).Delete(statusPod.Name, api.NewDeleteOptions(0)); err == nil {
glog.V(3).Infof("Pod %q fully terminated and removed from etcd", statusPod.Name)
s.DeletePodStatus(pod.UID)
m.DeletePodStatus(pod.UID)
return nil
}
}
@ -220,7 +243,7 @@ func (s *statusManager) syncBatch() error {
// is full, and the pod worker holding the lock is waiting on this method
// to clear the channel. Even if this delete never runs subsequent container
// changes on the node should trigger updates.
go s.DeletePodStatus(pod.UID)
go m.DeletePodStatus(pod.UID)
return fmt.Errorf("error updating status for pod %q: %v", kubeletUtil.FormatPodName(pod), err)
}

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet
package status
import (
"fmt"
@ -37,8 +37,8 @@ var testPod *api.Pod = &api.Pod{
},
}
func newTestStatusManager() *statusManager {
return newStatusManager(&testclient.Fake{})
func newTestManager() *manager {
return NewManager(&testclient.Fake{}).(*manager)
}
func generateRandomMessage() string {
@ -66,7 +66,7 @@ func verifyActions(t *testing.T, kubeClient client.Interface, expectedActions []
}
}
func verifyUpdates(t *testing.T, manager *statusManager, expectedUpdates int) {
func verifyUpdates(t *testing.T, manager *manager, expectedUpdates int) {
// Consume all updates in the channel.
numUpdates := 0
for {
@ -89,7 +89,7 @@ func verifyUpdates(t *testing.T, manager *statusManager, expectedUpdates int) {
}
func TestNewStatus(t *testing.T) {
syncer := newTestStatusManager()
syncer := newTestManager()
syncer.SetPodStatus(testPod, getRandomPodStatus())
verifyUpdates(t, syncer, 1)
@ -100,7 +100,7 @@ func TestNewStatus(t *testing.T) {
}
func TestNewStatusPreservesPodStartTime(t *testing.T) {
syncer := newTestStatusManager()
syncer := newTestManager()
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
UID: "12345678",
@ -121,14 +121,14 @@ func TestNewStatusPreservesPodStartTime(t *testing.T) {
}
func TestChangedStatus(t *testing.T) {
syncer := newTestStatusManager()
syncer := newTestManager()
syncer.SetPodStatus(testPod, getRandomPodStatus())
syncer.SetPodStatus(testPod, getRandomPodStatus())
verifyUpdates(t, syncer, 2)
}
func TestChangedStatusKeepsStartTime(t *testing.T) {
syncer := newTestStatusManager()
syncer := newTestManager()
now := util.Now()
firstStatus := getRandomPodStatus()
firstStatus.StartTime = &now
@ -145,7 +145,7 @@ func TestChangedStatusKeepsStartTime(t *testing.T) {
}
func TestUnchangedStatus(t *testing.T) {
syncer := newTestStatusManager()
syncer := newTestManager()
podStatus := getRandomPodStatus()
syncer.SetPodStatus(testPod, podStatus)
syncer.SetPodStatus(testPod, podStatus)
@ -153,7 +153,7 @@ func TestUnchangedStatus(t *testing.T) {
}
func TestSyncBatchIgnoresNotFound(t *testing.T) {
syncer := newTestStatusManager()
syncer := newTestManager()
syncer.SetPodStatus(testPod, getRandomPodStatus())
err := syncer.syncBatch()
if err != nil {
@ -165,7 +165,7 @@ func TestSyncBatchIgnoresNotFound(t *testing.T) {
}
func TestSyncBatch(t *testing.T) {
syncer := newTestStatusManager()
syncer := newTestManager()
syncer.kubeClient = testclient.NewSimpleFake(testPod)
syncer.SetPodStatus(testPod, getRandomPodStatus())
err := syncer.syncBatch()
@ -180,7 +180,7 @@ func TestSyncBatch(t *testing.T) {
}
func TestSyncBatchChecksMismatchedUID(t *testing.T) {
syncer := newTestStatusManager()
syncer := newTestManager()
testPod.UID = "first"
differentPod := *testPod
differentPod.UID = "second"