mirror of https://github.com/k3s-io/k3s
Merge pull request #10313 from yujuhong/kubelet_delete
Add an e2e test to verify that pods are deleted on nodespull/6/head
commit
700a6441de
|
@ -284,6 +284,24 @@ func (p *Pod) FindContainerByName(containerName string) *Container {
|
|||
return nil
|
||||
}
|
||||
|
||||
// ToAPIPod converts Pod to api.Pod. Note that if a field in api.Pod has no
|
||||
// corresponding field in Pod, the field would not be populated.
|
||||
func (p *Pod) ToAPIPod() *api.Pod {
|
||||
var pod api.Pod
|
||||
pod.UID = p.ID
|
||||
pod.Name = p.Name
|
||||
pod.Namespace = p.Namespace
|
||||
pod.Status = p.Status
|
||||
|
||||
for _, c := range p.Containers {
|
||||
var container api.Container
|
||||
container.Name = c.Name
|
||||
container.Image = c.Image
|
||||
pod.Spec.Containers = append(pod.Spec.Containers, container)
|
||||
}
|
||||
return &pod
|
||||
}
|
||||
|
||||
// IsEmpty returns true if the pod is empty.
|
||||
func (p *Pod) IsEmpty() bool {
|
||||
return reflect.DeepEqual(p, &Pod{})
|
||||
|
|
|
@ -1851,6 +1851,23 @@ func (kl *Kubelet) GetPods() []*api.Pod {
|
|||
return kl.podManager.GetPods()
|
||||
}
|
||||
|
||||
// GetRunningPods returns all pods running on kubelet from looking at the
|
||||
// container runtime cache. This function converts kubecontainer.Pod to
|
||||
// api.Pod, so only the fields that exist in both kubecontainer.Pod and
|
||||
// api.Pod are considered meaningful.
|
||||
func (kl *Kubelet) GetRunningPods() ([]*api.Pod, error) {
|
||||
pods, err := kl.runtimeCache.GetPods()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
apiPods := make([]*api.Pod, 0, len(pods))
|
||||
for _, pod := range pods {
|
||||
apiPods = append(apiPods, pod.ToAPIPod())
|
||||
}
|
||||
return apiPods, nil
|
||||
}
|
||||
|
||||
func (kl *Kubelet) GetPodByFullName(podFullName string) (*api.Pod, bool) {
|
||||
return kl.podManager.GetPodByFullName(podFullName)
|
||||
}
|
||||
|
|
|
@ -101,6 +101,7 @@ type HostInterface interface {
|
|||
GetRawContainerInfo(containerName string, req *cadvisorApi.ContainerInfoRequest, subcontainers bool) (map[string]*cadvisorApi.ContainerInfo, error)
|
||||
GetCachedMachineInfo() (*cadvisorApi.MachineInfo, error)
|
||||
GetPods() []*api.Pod
|
||||
GetRunningPods() ([]*api.Pod, error)
|
||||
GetPodByName(namespace, name string) (*api.Pod, bool)
|
||||
RunInContainer(name string, uid types.UID, container string, cmd []string) ([]byte, error)
|
||||
ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool) error
|
||||
|
@ -148,6 +149,8 @@ func (s *Server) InstallDebuggingHandlers() {
|
|||
s.mux.HandleFunc("/logs/", s.handleLogs)
|
||||
s.mux.HandleFunc("/containerLogs/", s.handleContainerLogs)
|
||||
s.mux.Handle("/metrics", prometheus.Handler())
|
||||
// The /runningpods endpoint is used for testing only.
|
||||
s.mux.HandleFunc("/runningpods", s.handleRunningPods)
|
||||
|
||||
s.mux.HandleFunc("/debug/pprof/", pprof.Index)
|
||||
s.mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
|
||||
|
@ -280,14 +283,38 @@ func (s *Server) handleContainerLogs(w http.ResponseWriter, req *http.Request) {
|
|||
}
|
||||
}
|
||||
|
||||
// handlePods returns a list of pod bound to the Kubelet and their spec
|
||||
func (s *Server) handlePods(w http.ResponseWriter, req *http.Request) {
|
||||
pods := s.host.GetPods()
|
||||
// encodePods creates an api.PodList object from pods and returns the encoded
|
||||
// PodList.
|
||||
func encodePods(pods []*api.Pod) (data []byte, err error) {
|
||||
podList := new(api.PodList)
|
||||
for _, pod := range pods {
|
||||
podList.Items = append(podList.Items, *pod)
|
||||
}
|
||||
data, err := latest.Codec.Encode(podList)
|
||||
return latest.Codec.Encode(podList)
|
||||
}
|
||||
|
||||
// handlePods returns a list of pods bound to the Kubelet and their spec.
|
||||
func (s *Server) handlePods(w http.ResponseWriter, req *http.Request) {
|
||||
pods := s.host.GetPods()
|
||||
data, err := encodePods(pods)
|
||||
if err != nil {
|
||||
s.error(w, err)
|
||||
return
|
||||
}
|
||||
w.Header().Add("Content-type", "application/json")
|
||||
w.Write(data)
|
||||
}
|
||||
|
||||
// handleRunningPods returns a list of pods running on Kubelet. The list is
|
||||
// provided by the container runtime, and is different from the list returned
|
||||
// by handlePods, which is a set of desired pods to run.
|
||||
func (s *Server) handleRunningPods(w http.ResponseWriter, req *http.Request) {
|
||||
pods, err := s.host.GetRunningPods()
|
||||
if err != nil {
|
||||
s.error(w, err)
|
||||
return
|
||||
}
|
||||
data, err := encodePods(pods)
|
||||
if err != nil {
|
||||
s.error(w, err)
|
||||
return
|
||||
|
|
|
@ -47,6 +47,7 @@ type fakeKubelet struct {
|
|||
rawInfoFunc func(query *cadvisorApi.ContainerInfoRequest) (map[string]*cadvisorApi.ContainerInfo, error)
|
||||
machineInfoFunc func() (*cadvisorApi.MachineInfo, error)
|
||||
podsFunc func() []*api.Pod
|
||||
runningPodsFunc func() ([]*api.Pod, error)
|
||||
logFunc func(w http.ResponseWriter, req *http.Request)
|
||||
runFunc func(podFullName string, uid types.UID, containerName string, cmd []string) ([]byte, error)
|
||||
containerVersionFunc func() (kubecontainer.Version, error)
|
||||
|
@ -91,6 +92,10 @@ func (fk *fakeKubelet) GetPods() []*api.Pod {
|
|||
return fk.podsFunc()
|
||||
}
|
||||
|
||||
func (fk *fakeKubelet) GetRunningPods() ([]*api.Pod, error) {
|
||||
return fk.runningPodsFunc()
|
||||
}
|
||||
|
||||
func (fk *fakeKubelet) ServeLogs(w http.ResponseWriter, req *http.Request) {
|
||||
fk.logFunc(w, req)
|
||||
}
|
||||
|
|
|
@ -0,0 +1,152 @@
|
|||
/*
|
||||
Copyright 2015 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package e2e
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait"
|
||||
|
||||
. "github.com/onsi/ginkgo"
|
||||
. "github.com/onsi/gomega"
|
||||
)
|
||||
|
||||
const (
|
||||
// Interval to poll /runningpods on a node
|
||||
pollInterval = 1 * time.Second
|
||||
)
|
||||
|
||||
// getPodMatches returns a set of pod names on the given node that matches the
|
||||
// podNamePrefix and namespace.
|
||||
func getPodMatches(c *client.Client, nodeName string, podNamePrefix string, namespace string) util.StringSet {
|
||||
matches := util.NewStringSet()
|
||||
Logf("Checking pods on node %v via /runningpods endpoint", nodeName)
|
||||
runningPods, err := GetKubeletPods(c, nodeName)
|
||||
if err != nil {
|
||||
Logf("Error checking running pods on %v: %v", nodeName, err)
|
||||
return matches
|
||||
}
|
||||
for _, pod := range runningPods.Items {
|
||||
if pod.Namespace == namespace && strings.HasPrefix(pod.Name, podNamePrefix) {
|
||||
matches.Insert(pod.Name)
|
||||
}
|
||||
}
|
||||
return matches
|
||||
}
|
||||
|
||||
// waitTillNPodsRunningOnNodes polls the /runningpods endpoint on kubelet until
|
||||
// it finds targetNumPods pods that match the given criteria (namespace and
|
||||
// podNamePrefix). Note that we usually use label selector to filter pods that
|
||||
// belong to the same RC. However, we use podNamePrefix with namespace here
|
||||
// because pods returned from /runningpods do not contain the original label
|
||||
// information; they are reconstructed by examining the container runtime. In
|
||||
// the scope of this test, we do not expect pod naming conflicts so
|
||||
// podNamePrefix should be sufficient to identify the pods.
|
||||
func waitTillNPodsRunningOnNodes(c *client.Client, nodeNames util.StringSet, podNamePrefix string, namespace string, targetNumPods int, timeout time.Duration) error {
|
||||
return wait.Poll(pollInterval, timeout, func() (bool, error) {
|
||||
matchCh := make(chan util.StringSet, len(nodeNames))
|
||||
for _, item := range nodeNames.List() {
|
||||
// Launch a goroutine per node to check the pods running on the nodes.
|
||||
nodeName := item
|
||||
go func() {
|
||||
matchCh <- getPodMatches(c, nodeName, podNamePrefix, namespace)
|
||||
}()
|
||||
}
|
||||
|
||||
seen := util.NewStringSet()
|
||||
for i := 0; i < len(nodeNames.List()); i++ {
|
||||
seen = seen.Union(<-matchCh)
|
||||
}
|
||||
if seen.Len() == targetNumPods {
|
||||
return true, nil
|
||||
}
|
||||
Logf("Waiting for %d pods to be running on the node; %d are currently running;", targetNumPods, seen.Len())
|
||||
return false, nil
|
||||
})
|
||||
}
|
||||
|
||||
var _ = Describe("Clean up pods on node", func() {
|
||||
var numNodes int
|
||||
var nodeNames util.StringSet
|
||||
framework := NewFramework("kubelet-delete")
|
||||
|
||||
BeforeEach(func() {
|
||||
nodes, err := framework.Client.Nodes().List(labels.Everything(), fields.Everything())
|
||||
expectNoError(err)
|
||||
numNodes = len(nodes.Items)
|
||||
nodeNames = util.NewStringSet()
|
||||
for _, node := range nodes.Items {
|
||||
nodeNames.Insert(node.Name)
|
||||
}
|
||||
})
|
||||
|
||||
type DeleteTest struct {
|
||||
podsPerNode int
|
||||
timeout time.Duration
|
||||
}
|
||||
|
||||
deleteTests := []DeleteTest{
|
||||
{podsPerNode: 10, timeout: 1 * time.Minute},
|
||||
}
|
||||
|
||||
for _, itArg := range deleteTests {
|
||||
name := fmt.Sprintf(
|
||||
"kubelet should be able to delete %d pods per node in %v.", itArg.podsPerNode, itArg.timeout)
|
||||
It(name, func() {
|
||||
totalPods := itArg.podsPerNode * numNodes
|
||||
|
||||
By(fmt.Sprintf("Creating a RC of %d pods and wait until all pods of this RC are running", totalPods))
|
||||
rcName := fmt.Sprintf("cleanup%d-%s", totalPods, string(util.NewUUID()))
|
||||
|
||||
Expect(RunRC(RCConfig{
|
||||
Client: framework.Client,
|
||||
Name: rcName,
|
||||
Namespace: framework.Namespace.Name,
|
||||
Image: "gcr.io/google_containers/pause:go",
|
||||
Replicas: totalPods,
|
||||
})).NotTo(HaveOccurred())
|
||||
|
||||
// Perform a sanity check so that we know all desired pods are
|
||||
// running on the nodes according to kubelet. The timeout is set to
|
||||
// only 30 seconds here because RunRC already waited for all pods to
|
||||
// transition to the running status.
|
||||
Expect(waitTillNPodsRunningOnNodes(framework.Client, nodeNames, rcName, framework.Namespace.Name, totalPods,
|
||||
time.Second*30)).NotTo(HaveOccurred())
|
||||
|
||||
By("Deleting the RC")
|
||||
DeleteRC(framework.Client, framework.Namespace.Name, rcName)
|
||||
// Check that the pods really are gone by querying /runningpods on the
|
||||
// node. The /runningpods handler checks the container runtime (or its
|
||||
// cache) and returns a list of running pods. Some possible causes of
|
||||
// failures are:
|
||||
// - kubelet deadlock
|
||||
// - a bug in graceful termination (if it is enabled)
|
||||
// - docker slow to delete pods (or resource problems causing slowness)
|
||||
start := time.Now()
|
||||
Expect(waitTillNPodsRunningOnNodes(framework.Client, nodeNames, rcName, framework.Namespace.Name, 0,
|
||||
itArg.timeout)).NotTo(HaveOccurred())
|
||||
Logf("Deleting %d pods on %d nodes completed in %v after the RC was deleted", totalPods, len(nodeNames),
|
||||
time.Since(start))
|
||||
})
|
||||
}
|
||||
})
|
|
@ -25,6 +25,7 @@ import (
|
|||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/metrics"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/master/ports"
|
||||
|
@ -171,15 +172,19 @@ func HighLatencyKubeletOperations(c *client.Client, threshold time.Duration, nod
|
|||
return badMetrics, nil
|
||||
}
|
||||
|
||||
// Retrieve metrics from the kubelet server of the given node.
|
||||
func getKubeletMetricsThroughProxy(c *client.Client, node string) (string, error) {
|
||||
metric, err := c.Get().
|
||||
// Performs a get on a node proxy endpoint given the nodename and rest client.
|
||||
func nodeProxyRequest(c *client.Client, node, endpoint string) client.Result {
|
||||
return c.Get().
|
||||
Prefix("proxy").
|
||||
Resource("nodes").
|
||||
Name(fmt.Sprintf("%v:%v", node, ports.KubeletPort)).
|
||||
Suffix("metrics").
|
||||
Do().
|
||||
Raw()
|
||||
Suffix(endpoint).
|
||||
Do()
|
||||
}
|
||||
|
||||
// Retrieve metrics from the kubelet server of the given node.
|
||||
func getKubeletMetricsThroughProxy(c *client.Client, node string) (string, error) {
|
||||
metric, err := nodeProxyRequest(c, node, "metrics").Raw()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
@ -200,3 +205,14 @@ func getKubeletMetricsThroughNode(nodeName string) (string, error) {
|
|||
}
|
||||
return string(body), nil
|
||||
}
|
||||
|
||||
// GetKubeletPods retrieves the list of running pods on the kubelet. The pods
|
||||
// includes necessary information (e.g., UID, name, namespace for
|
||||
// pods/containers), but do not contain the full spec.
|
||||
func GetKubeletPods(c *client.Client, node string) (*api.PodList, error) {
|
||||
result := &api.PodList{}
|
||||
if err := nodeProxyRequest(c, node, "runningpods").Into(result); err != nil {
|
||||
return &api.PodList{}, err
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue