mirror of https://github.com/k3s-io/k3s
Remove GetAPIPodStatus usage
parent
17a5058e83
commit
41b12a18d9
|
@ -1616,10 +1616,7 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, podStatus *kubecont
|
|||
}
|
||||
}
|
||||
|
||||
apiPodStatus, err := kl.generatePodStatus(pod, podStatus)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
apiPodStatus := kl.generatePodStatus(pod, podStatus)
|
||||
// Record the time it takes for the pod to become running.
|
||||
existingStatus, ok := kl.statusManager.GetPodStatus(pod.UID)
|
||||
if !ok || existingStatus.Phase == api.PodPending && apiPodStatus.Phase == api.PodRunning &&
|
||||
|
@ -1795,11 +1792,12 @@ func (kl *Kubelet) cleanupBandwidthLimits(allPods []*api.Pod) error {
|
|||
}
|
||||
status, found := kl.statusManager.GetPodStatus(pod.UID)
|
||||
if !found {
|
||||
statusPtr, err := kl.containerRuntime.GetAPIPodStatus(pod)
|
||||
// TODO(random-liu): Cleanup status get functions. (issue #20477)
|
||||
s, err := kl.containerRuntime.GetPodStatus(pod.UID, pod.Name, pod.Namespace)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
status = *statusPtr
|
||||
status = kl.generatePodStatus(pod, s)
|
||||
}
|
||||
if status.Phase == api.PodRunning {
|
||||
possibleCIDRs.Insert(fmt.Sprintf("%s/32", status.PodIP))
|
||||
|
@ -3088,7 +3086,7 @@ func (kl *Kubelet) getRuntimePodStatus(pod *api.Pod) (*kubecontainer.PodStatus,
|
|||
return kl.containerRuntime.GetPodStatus(pod.UID, pod.Name, pod.Namespace)
|
||||
}
|
||||
|
||||
func (kl *Kubelet) generatePodStatus(pod *api.Pod, podStatus *kubecontainer.PodStatus) (api.PodStatus, error) {
|
||||
func (kl *Kubelet) generatePodStatus(pod *api.Pod, podStatus *kubecontainer.PodStatus) api.PodStatus {
|
||||
glog.V(3).Infof("Generating status for %q", format.Pod(pod))
|
||||
// TODO: Consider include the container information.
|
||||
if kl.pastActiveDeadline(pod) {
|
||||
|
@ -3097,7 +3095,7 @@ func (kl *Kubelet) generatePodStatus(pod *api.Pod, podStatus *kubecontainer.PodS
|
|||
return api.PodStatus{
|
||||
Phase: api.PodFailed,
|
||||
Reason: reason,
|
||||
Message: "Pod was active on the node longer than specified deadline"}, nil
|
||||
Message: "Pod was active on the node longer than specified deadline"}
|
||||
}
|
||||
|
||||
s := kl.convertStatusToAPIStatus(pod, podStatus)
|
||||
|
@ -3120,7 +3118,7 @@ func (kl *Kubelet) generatePodStatus(pod *api.Pod, podStatus *kubecontainer.PodS
|
|||
}
|
||||
}
|
||||
|
||||
return *s, nil
|
||||
return *s
|
||||
}
|
||||
|
||||
// TODO(random-liu): Move this to some better place.
|
||||
|
|
|
@ -4118,13 +4118,13 @@ func TestDoesNotDeletePodDirsIfContainerIsRunning(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestCleanupBandwidthLimits(t *testing.T) {
|
||||
// TODO(random-liu): We removed the test case for pod status not cached here. We should add a higher
|
||||
// layer status getter function and test that function instead.
|
||||
tests := []struct {
|
||||
status *api.PodStatus
|
||||
pods []*api.Pod
|
||||
inputCIDRs []string
|
||||
expectResetCIDRs []string
|
||||
cacheStatus bool
|
||||
expectedCalls []string
|
||||
name string
|
||||
}{
|
||||
{
|
||||
|
@ -4149,35 +4149,8 @@ func TestCleanupBandwidthLimits(t *testing.T) {
|
|||
},
|
||||
inputCIDRs: []string{"1.2.3.4/32", "2.3.4.5/32", "5.6.7.8/32"},
|
||||
expectResetCIDRs: []string{"2.3.4.5/32", "5.6.7.8/32"},
|
||||
expectedCalls: []string{"GetAPIPodStatus"},
|
||||
name: "pod running",
|
||||
},
|
||||
{
|
||||
status: &api.PodStatus{
|
||||
PodIP: "1.2.3.4",
|
||||
Phase: api.PodRunning,
|
||||
},
|
||||
pods: []*api.Pod{
|
||||
{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: "foo",
|
||||
Annotations: map[string]string{
|
||||
"kubernetes.io/ingress-bandwidth": "10M",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: "bar",
|
||||
},
|
||||
},
|
||||
},
|
||||
inputCIDRs: []string{"1.2.3.4/32", "2.3.4.5/32", "5.6.7.8/32"},
|
||||
expectResetCIDRs: []string{"2.3.4.5/32", "5.6.7.8/32"},
|
||||
expectedCalls: []string{},
|
||||
cacheStatus: true,
|
||||
name: "pod running with cache",
|
||||
},
|
||||
{
|
||||
status: &api.PodStatus{
|
||||
PodIP: "1.2.3.4",
|
||||
|
@ -4200,7 +4173,6 @@ func TestCleanupBandwidthLimits(t *testing.T) {
|
|||
},
|
||||
inputCIDRs: []string{"1.2.3.4/32", "2.3.4.5/32", "5.6.7.8/32"},
|
||||
expectResetCIDRs: []string{"1.2.3.4/32", "2.3.4.5/32", "5.6.7.8/32"},
|
||||
expectedCalls: []string{"GetAPIPodStatus"},
|
||||
name: "pod not running",
|
||||
},
|
||||
{
|
||||
|
@ -4208,32 +4180,6 @@ func TestCleanupBandwidthLimits(t *testing.T) {
|
|||
PodIP: "1.2.3.4",
|
||||
Phase: api.PodFailed,
|
||||
},
|
||||
pods: []*api.Pod{
|
||||
{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: "foo",
|
||||
Annotations: map[string]string{
|
||||
"kubernetes.io/ingress-bandwidth": "10M",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: "bar",
|
||||
},
|
||||
},
|
||||
},
|
||||
inputCIDRs: []string{"1.2.3.4/32", "2.3.4.5/32", "5.6.7.8/32"},
|
||||
expectResetCIDRs: []string{"1.2.3.4/32", "2.3.4.5/32", "5.6.7.8/32"},
|
||||
expectedCalls: []string{},
|
||||
cacheStatus: true,
|
||||
name: "pod not running with cache",
|
||||
},
|
||||
{
|
||||
status: &api.PodStatus{
|
||||
PodIP: "1.2.3.4",
|
||||
Phase: api.PodRunning,
|
||||
},
|
||||
pods: []*api.Pod{
|
||||
{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
|
@ -4258,12 +4204,9 @@ func TestCleanupBandwidthLimits(t *testing.T) {
|
|||
|
||||
testKube := newTestKubelet(t)
|
||||
testKube.kubelet.shaper = shaper
|
||||
testKube.fakeRuntime.APIPodStatus = *test.status
|
||||
|
||||
if test.cacheStatus {
|
||||
for _, pod := range test.pods {
|
||||
testKube.kubelet.statusManager.SetPodStatus(pod, *test.status)
|
||||
}
|
||||
for _, pod := range test.pods {
|
||||
testKube.kubelet.statusManager.SetPodStatus(pod, *test.status)
|
||||
}
|
||||
|
||||
err := testKube.kubelet.cleanupBandwidthLimits(test.pods)
|
||||
|
@ -4273,14 +4216,6 @@ func TestCleanupBandwidthLimits(t *testing.T) {
|
|||
if !reflect.DeepEqual(shaper.ResetCIDRs, test.expectResetCIDRs) {
|
||||
t.Errorf("[%s]\nexpected: %v, saw: %v", test.name, test.expectResetCIDRs, shaper.ResetCIDRs)
|
||||
}
|
||||
|
||||
if test.cacheStatus {
|
||||
if len(testKube.fakeRuntime.CalledFunctions) != 0 {
|
||||
t.Errorf("unexpected function calls: %v", testKube.fakeRuntime.CalledFunctions)
|
||||
}
|
||||
} else if !reflect.DeepEqual(testKube.fakeRuntime.CalledFunctions, test.expectedCalls) {
|
||||
t.Errorf("[%s], expected %v, saw %v", test.name, test.expectedCalls, testKube.fakeRuntime.CalledFunctions)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -25,6 +25,7 @@ import (
|
|||
"k8s.io/kubernetes/pkg/api"
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
|
||||
"k8s.io/kubernetes/pkg/types"
|
||||
"k8s.io/kubernetes/pkg/util/intstr"
|
||||
)
|
||||
|
||||
|
@ -35,7 +36,7 @@ type HandlerRunner struct {
|
|||
}
|
||||
|
||||
type podStatusProvider interface {
|
||||
GetAPIPodStatus(pod *api.Pod) (*api.PodStatus, error)
|
||||
GetPodStatus(uid types.UID, name, namespace string) (*kubecontainer.PodStatus, error)
|
||||
}
|
||||
|
||||
func NewHandlerRunner(httpGetter kubetypes.HttpGetter, commandRunner kubecontainer.ContainerCommandRunner, containerManager podStatusProvider) kubecontainer.HandlerRunner {
|
||||
|
@ -86,15 +87,15 @@ func resolvePort(portReference intstr.IntOrString, container *api.Container) (in
|
|||
func (hr *HandlerRunner) runHTTPHandler(pod *api.Pod, container *api.Container, handler *api.Handler) error {
|
||||
host := handler.HTTPGet.Host
|
||||
if len(host) == 0 {
|
||||
status, err := hr.containerManager.GetAPIPodStatus(pod)
|
||||
status, err := hr.containerManager.GetPodStatus(pod.UID, pod.Name, pod.Namespace)
|
||||
if err != nil {
|
||||
glog.Errorf("Unable to get pod info, event handlers may be invalid.")
|
||||
return err
|
||||
}
|
||||
if status.PodIP == "" {
|
||||
if status.IP == "" {
|
||||
return fmt.Errorf("failed to find networking container: %v", status)
|
||||
}
|
||||
host = status.PodIP
|
||||
host = status.IP
|
||||
}
|
||||
var port int
|
||||
if handler.HTTPGet.Port.Type == intstr.String && len(handler.HTTPGet.Port.StrVal) == 0 {
|
||||
|
|
|
@ -23,7 +23,7 @@ import (
|
|||
|
||||
"github.com/golang/glog"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/kubelet/container"
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
|
||||
"k8s.io/kubernetes/pkg/kubelet/util/format"
|
||||
)
|
||||
|
@ -89,10 +89,10 @@ func (kl *Kubelet) runOnce(pods []*api.Pod, retryDelay time.Duration) (results [
|
|||
results = append(results, res)
|
||||
if res.Err != nil {
|
||||
// TODO(proppy): report which containers failed the pod.
|
||||
glog.Infof("failed to start pod %q: %v", res.Pod.Name, res.Err)
|
||||
failedPods = append(failedPods, res.Pod.Name)
|
||||
glog.Infof("failed to start pod %q: %v", format.Pod(res.Pod), res.Err)
|
||||
failedPods = append(failedPods, format.Pod(res.Pod))
|
||||
} else {
|
||||
glog.Infof("started pod %q", res.Pod.Name)
|
||||
glog.Infof("started pod %q", format.Pod(res.Pod))
|
||||
}
|
||||
}
|
||||
if len(failedPods) > 0 {
|
||||
|
@ -107,25 +107,17 @@ func (kl *Kubelet) runPod(pod *api.Pod, retryDelay time.Duration) error {
|
|||
delay := retryDelay
|
||||
retry := 0
|
||||
for {
|
||||
pods, err := kl.containerRuntime.GetPods(false)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get kubelet pods: %v", err)
|
||||
}
|
||||
p := container.Pods(pods).FindPodByID(pod.UID)
|
||||
running, err := kl.isPodRunning(pod, p)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to check pod status: %v", err)
|
||||
}
|
||||
if running {
|
||||
glog.Infof("pod %q containers running", pod.Name)
|
||||
return nil
|
||||
}
|
||||
glog.Infof("pod %q containers not running: syncing", pod.Name)
|
||||
|
||||
status, err := kl.containerRuntime.GetPodStatus(pod.UID, pod.Name, pod.Namespace)
|
||||
if err != nil {
|
||||
glog.Errorf("Unable to get status for pod %q: %v", pod.Name, err)
|
||||
return fmt.Errorf("Unable to get status for pod %q: %v", format.Pod(pod), err)
|
||||
}
|
||||
|
||||
if kl.isPodRunning(pod, status) {
|
||||
glog.Infof("pod %q containers running", format.Pod(pod))
|
||||
return nil
|
||||
}
|
||||
glog.Infof("pod %q containers not running: syncing", format.Pod(pod))
|
||||
|
||||
glog.Infof("Creating a mirror pod for static pod %q", format.Pod(pod))
|
||||
if err := kl.podManager.CreateMirrorPod(pod); err != nil {
|
||||
glog.Errorf("Failed creating a mirror pod %q: %v", format.Pod(pod), err)
|
||||
|
@ -133,13 +125,13 @@ func (kl *Kubelet) runPod(pod *api.Pod, retryDelay time.Duration) error {
|
|||
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
|
||||
|
||||
if err = kl.syncPod(pod, mirrorPod, status, kubetypes.SyncPodUpdate); err != nil {
|
||||
return fmt.Errorf("error syncing pod: %v", err)
|
||||
return fmt.Errorf("error syncing pod %q: %v", format.Pod(pod), err)
|
||||
}
|
||||
if retry >= runOnceMaxRetries {
|
||||
return fmt.Errorf("timeout error: pod %q containers not running after %d retries", pod.Name, runOnceMaxRetries)
|
||||
return fmt.Errorf("timeout error: pod %q containers not running after %d retries", format.Pod(pod), runOnceMaxRetries)
|
||||
}
|
||||
// TODO(proppy): health checking would be better than waiting + checking the state at the next iteration.
|
||||
glog.Infof("pod %q containers synced, waiting for %v", pod.Name, delay)
|
||||
glog.Infof("pod %q containers synced, waiting for %v", format.Pod(pod), delay)
|
||||
time.Sleep(delay)
|
||||
retry++
|
||||
delay *= runOnceRetryDelayBackoff
|
||||
|
@ -147,18 +139,13 @@ func (kl *Kubelet) runPod(pod *api.Pod, retryDelay time.Duration) error {
|
|||
}
|
||||
|
||||
// isPodRunning returns true if all containers of a manifest are running.
|
||||
func (kl *Kubelet) isPodRunning(pod *api.Pod, runningPod container.Pod) (bool, error) {
|
||||
// TODO(random-liu): Change this to new pod status
|
||||
status, err := kl.containerRuntime.GetAPIPodStatus(pod)
|
||||
if err != nil {
|
||||
glog.Infof("Failed to get the status of pod %q: %v", format.Pod(pod), err)
|
||||
return false, err
|
||||
}
|
||||
for _, st := range status.ContainerStatuses {
|
||||
if st.State.Running == nil {
|
||||
glog.Infof("Container %q not running: %#v", st.Name, st.State)
|
||||
return false, nil
|
||||
func (kl *Kubelet) isPodRunning(pod *api.Pod, status *kubecontainer.PodStatus) bool {
|
||||
for _, c := range pod.Spec.Containers {
|
||||
cs := status.FindContainerStatusByName(c.Name)
|
||||
if cs == nil || cs.State != kubecontainer.ContainerStateRunning {
|
||||
glog.Infof("Container %q for pod %q not running", c.Name, format.Pod(pod))
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true, nil
|
||||
return true
|
||||
}
|
||||
|
|
|
@ -31,6 +31,7 @@ import (
|
|||
"k8s.io/kubernetes/pkg/kubelet/network"
|
||||
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
|
||||
"k8s.io/kubernetes/pkg/kubelet/status"
|
||||
"k8s.io/kubernetes/pkg/util"
|
||||
utiltesting "k8s.io/kubernetes/pkg/util/testing"
|
||||
)
|
||||
|
||||
|
@ -67,6 +68,8 @@ func TestRunOnce(t *testing.T) {
|
|||
volumeManager: newVolumeManager(),
|
||||
diskSpaceManager: diskSpaceManager,
|
||||
containerRuntime: fakeRuntime,
|
||||
reasonCache: NewReasonCache(),
|
||||
clock: util.RealClock{},
|
||||
}
|
||||
kb.containerManager = cm.NewStubContainerManager()
|
||||
|
||||
|
@ -90,6 +93,20 @@ func TestRunOnce(t *testing.T) {
|
|||
},
|
||||
}
|
||||
podManager.SetPods(pods)
|
||||
// The original test here is totally meaningless, because fakeruntime will always return an empty podStatus. While
|
||||
// the originial logic of isPodRunning happens to return true when podstatus is empty, so the test can always pass.
|
||||
// Now the logic in isPodRunning is changed, to let the test pass, we set the podstatus directly in fake runtime.
|
||||
// This is also a meaningless test, because the isPodRunning will also always return true after setting this. However,
|
||||
// because runonce is never used in kubernetes now, we should deprioritize the cleanup work.
|
||||
// TODO(random-liu) Fix the test, make it meaningful.
|
||||
fakeRuntime.PodStatus = kubecontainer.PodStatus{
|
||||
ContainerStatuses: []*kubecontainer.ContainerStatus{
|
||||
{
|
||||
Name: "bar",
|
||||
State: kubecontainer.ContainerStateRunning,
|
||||
},
|
||||
},
|
||||
}
|
||||
results, err := kb.runOnce(pods, time.Millisecond)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
|
|
Loading…
Reference in New Issue