mirror of https://github.com/k3s-io/k3s
Merge pull request #24567 from yifan-gu/post_start_hook
Automatic merge from submit-queue rkt: Add post-start hook support. This adds a poll-and-timeout procedure after the pod is started, to make sure the post-start hooks execute when the container is actually running. This is a temporal workaround for implementing post-hooks, a long term solution is to use lifecycle event to trigger those hooks, see https://github.com/kubernetes/kubernetes/issues/23084. Also this fixes a bug of getting container ID for a non-running container when running pre-stop hook. cc @sjpotter @euank @kubernetes/sig-nodepull/6/head
commit
4c7abddc1c
|
@ -46,7 +46,7 @@ func buildContainerID(c *containerID) kubecontainer.ContainerID {
|
|||
func parseContainerID(id kubecontainer.ContainerID) (*containerID, error) {
|
||||
tuples := strings.Split(id.ID, ":")
|
||||
if len(tuples) != 2 {
|
||||
return nil, fmt.Errorf("rkt: cannot parse container ID for: %v", id)
|
||||
return nil, fmt.Errorf("rkt: cannot parse container ID for: %q, required format is [UUID:APPNAME]", id)
|
||||
}
|
||||
return &containerID{
|
||||
uuid: tuples[0],
|
||||
|
|
|
@ -79,7 +79,7 @@ func (f *fakeRktInterface) InspectPod(ctx context.Context, in *rktapi.InspectPod
|
|||
return &rktapi.InspectPodResponse{Pod: pod}, f.err
|
||||
}
|
||||
}
|
||||
return &rktapi.InspectPodResponse{Pod: nil}, f.err
|
||||
return &rktapi.InspectPodResponse{}, fmt.Errorf("pod %q not found", in.Id)
|
||||
}
|
||||
|
||||
func (f *fakeRktInterface) ListImages(ctx context.Context, in *rktapi.ListImagesRequest, opts ...grpc.CallOption) (*rktapi.ListImagesResponse, error) {
|
||||
|
|
|
@ -55,6 +55,7 @@ import (
|
|||
"k8s.io/kubernetes/pkg/util/flowcontrol"
|
||||
"k8s.io/kubernetes/pkg/util/sets"
|
||||
utilstrings "k8s.io/kubernetes/pkg/util/strings"
|
||||
utilwait "k8s.io/kubernetes/pkg/util/wait"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -1031,10 +1032,63 @@ func (r *Runtime) RunPod(pod *api.Pod, pullSecrets []api.Secret) error {
|
|||
}
|
||||
|
||||
r.generateEvents(runtimePod, "Started", nil)
|
||||
|
||||
// This is a temporary solution until we have a clean design on how
|
||||
// kubelet handles events. See https://github.com/kubernetes/kubernetes/issues/23084.
|
||||
if err := r.runLifecycleHooks(pod, runtimePod, lifecyclePostStartHook); err != nil {
|
||||
if errKill := r.KillPod(pod, *runtimePod); errKill != nil {
|
||||
return errors.NewAggregate([]error{err, errKill})
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Runtime) runPreStopHook(pod *api.Pod, runtimePod *kubecontainer.Pod) error {
|
||||
func (r *Runtime) runPreStopHook(containerID kubecontainer.ContainerID, pod *api.Pod, container *api.Container) error {
|
||||
glog.V(4).Infof("rkt: Running pre-stop hook for container %q of pod %q", container.Name, format.Pod(pod))
|
||||
return r.runner.Run(containerID, pod, container, container.Lifecycle.PreStop)
|
||||
}
|
||||
|
||||
func (r *Runtime) runPostStartHook(containerID kubecontainer.ContainerID, pod *api.Pod, container *api.Container) error {
|
||||
glog.V(4).Infof("rkt: Running post-start hook for container %q of pod %q", container.Name, format.Pod(pod))
|
||||
cid, err := parseContainerID(containerID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot parse container ID %v", containerID)
|
||||
}
|
||||
|
||||
isContainerRunning := func() (done bool, err error) {
|
||||
resp, err := r.apisvc.InspectPod(context.Background(), &rktapi.InspectPodRequest{Id: cid.uuid})
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("failed to inspect rkt pod %q for pod %q", cid.uuid, format.Pod(pod))
|
||||
}
|
||||
|
||||
for _, app := range resp.Pod.Apps {
|
||||
if app.Name == cid.appName {
|
||||
return app.State == rktapi.AppState_APP_STATE_RUNNING, nil
|
||||
}
|
||||
}
|
||||
return false, fmt.Errorf("failed to find container %q in rkt pod %q", cid.appName, cid.uuid)
|
||||
}
|
||||
|
||||
// TODO(yifan): Polling the pod's state for now.
|
||||
timeout := time.Second * 5
|
||||
pollInterval := time.Millisecond * 500
|
||||
if err := utilwait.Poll(pollInterval, timeout, isContainerRunning); err != nil {
|
||||
return fmt.Errorf("rkt: Pod %q doesn't become running in %v: %v", format.Pod(pod), timeout, err)
|
||||
}
|
||||
|
||||
return r.runner.Run(containerID, pod, container, container.Lifecycle.PostStart)
|
||||
}
|
||||
|
||||
type lifecycleHookType string
|
||||
|
||||
const (
|
||||
lifecyclePostStartHook lifecycleHookType = "post-start"
|
||||
lifecyclePreStopHook lifecycleHookType = "pre-stop"
|
||||
)
|
||||
|
||||
func (r *Runtime) runLifecycleHooks(pod *api.Pod, runtimePod *kubecontainer.Pod, typ lifecycleHookType) error {
|
||||
var wg sync.WaitGroup
|
||||
var errlist []error
|
||||
errCh := make(chan error, len(pod.Spec.Containers))
|
||||
|
@ -1042,21 +1096,43 @@ func (r *Runtime) runPreStopHook(pod *api.Pod, runtimePod *kubecontainer.Pod) er
|
|||
wg.Add(len(pod.Spec.Containers))
|
||||
|
||||
for i, c := range pod.Spec.Containers {
|
||||
if c.Lifecycle == nil || c.Lifecycle.PreStop == nil {
|
||||
var hookFunc func(kubecontainer.ContainerID, *api.Pod, *api.Container) error
|
||||
|
||||
switch typ {
|
||||
case lifecyclePostStartHook:
|
||||
if c.Lifecycle != nil && c.Lifecycle.PostStart != nil {
|
||||
hookFunc = r.runPostStartHook
|
||||
}
|
||||
case lifecyclePreStopHook:
|
||||
if c.Lifecycle != nil && c.Lifecycle.PreStop != nil {
|
||||
hookFunc = r.runPreStopHook
|
||||
}
|
||||
default:
|
||||
errCh <- fmt.Errorf("Unrecognized lifecycle hook type %q for container %q in pod %q", typ, c.Name, format.Pod(pod))
|
||||
}
|
||||
|
||||
if hookFunc == nil {
|
||||
wg.Done()
|
||||
continue
|
||||
}
|
||||
|
||||
hook := c.Lifecycle.PreStop
|
||||
containerID := runtimePod.Containers[i].ID
|
||||
container := &pod.Spec.Containers[i]
|
||||
runtimeContainer := runtimePod.FindContainerByName(container.Name)
|
||||
if runtimeContainer == nil {
|
||||
// Container already gone.
|
||||
wg.Done()
|
||||
continue
|
||||
}
|
||||
containerID := runtimeContainer.ID
|
||||
|
||||
go func() {
|
||||
if err := r.runner.Run(containerID, pod, container, hook); err != nil {
|
||||
glog.Errorf("rkt: Failed to run pre-stop hook for container %q of pod %q: %v", container.Name, format.Pod(pod), err)
|
||||
defer wg.Done()
|
||||
if err := hookFunc(containerID, pod, container); err != nil {
|
||||
glog.Errorf("rkt: Failed to run %s hook for container %q of pod %q: %v", typ, container.Name, format.Pod(pod), err)
|
||||
errCh <- err
|
||||
} else {
|
||||
glog.V(4).Infof("rkt: %s hook completed successfully for container %q of pod %q", typ, container.Name, format.Pod(pod))
|
||||
}
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
|
||||
|
@ -1188,23 +1264,18 @@ func (r *Runtime) waitPreStopHooks(pod *api.Pod, runningPod *kubecontainer.Pod)
|
|||
gracePeriod = *pod.Spec.TerminationGracePeriodSeconds
|
||||
}
|
||||
|
||||
errCh := make(chan error, 1)
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
if err := r.runPreStopHook(pod, runningPod); err != nil {
|
||||
errCh <- err
|
||||
if err := r.runLifecycleHooks(pod, runningPod, lifecyclePreStopHook); err != nil {
|
||||
glog.Errorf("rkt: Some pre-stop hooks failed for pod %q: %v", format.Pod(pod), err)
|
||||
}
|
||||
close(errCh)
|
||||
close(done)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-time.After(time.Duration(gracePeriod) * time.Second):
|
||||
glog.V(2).Infof("rkt: Some pre-stop hooks did not complete in %d seconds for pod %v", gracePeriod, format.Pod(pod))
|
||||
case err := <-errCh:
|
||||
if err != nil {
|
||||
glog.Errorf("rkt: Some pre-stop hooks failed for pod %v: %v", format.Pod(pod), err)
|
||||
} else {
|
||||
glog.V(4).Infof("rkt: pre-stop hooks for pod %v completed", format.Pod(pod))
|
||||
}
|
||||
glog.V(2).Infof("rkt: Some pre-stop hooks did not complete in %d seconds for pod %q", gracePeriod, format.Pod(pod))
|
||||
case <-done:
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1204,7 +1204,7 @@ func TestGenerateRunCommand(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestPreStopHooks(t *testing.T) {
|
||||
func TestLifeCycleHooks(t *testing.T) {
|
||||
runner := lifecycle.NewFakeHandlerRunner()
|
||||
fr := newFakeRktInterface()
|
||||
fs := newFakeSystemd()
|
||||
|
@ -1219,6 +1219,7 @@ func TestPreStopHooks(t *testing.T) {
|
|||
tests := []struct {
|
||||
pod *api.Pod
|
||||
runtimePod *kubecontainer.Pod
|
||||
postStartRuns []string
|
||||
preStopRuns []string
|
||||
err error
|
||||
}{
|
||||
|
@ -1242,10 +1243,11 @@ func TestPreStopHooks(t *testing.T) {
|
|||
},
|
||||
},
|
||||
[]string{},
|
||||
[]string{},
|
||||
nil,
|
||||
},
|
||||
{
|
||||
// Case 1, containers with pre-stop hook.
|
||||
// Case 1, containers with post-start and pre-stop hooks.
|
||||
&api.Pod{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: "pod-1",
|
||||
|
@ -1257,13 +1259,29 @@ func TestPreStopHooks(t *testing.T) {
|
|||
{
|
||||
Name: "container-name-1",
|
||||
Lifecycle: &api.Lifecycle{
|
||||
PreStop: &api.Handler{
|
||||
PostStart: &api.Handler{
|
||||
Exec: &api.ExecAction{},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "container-name-2",
|
||||
Lifecycle: &api.Lifecycle{
|
||||
PostStart: &api.Handler{
|
||||
HTTPGet: &api.HTTPGetAction{},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "container-name-3",
|
||||
Lifecycle: &api.Lifecycle{
|
||||
PreStop: &api.Handler{
|
||||
Exec: &api.ExecAction{},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "container-name-4",
|
||||
Lifecycle: &api.Lifecycle{
|
||||
PreStop: &api.Handler{
|
||||
HTTPGet: &api.HTTPGetAction{},
|
||||
|
@ -1275,13 +1293,31 @@ func TestPreStopHooks(t *testing.T) {
|
|||
},
|
||||
&kubecontainer.Pod{
|
||||
Containers: []*kubecontainer.Container{
|
||||
{ID: kubecontainer.BuildContainerID("rkt", "id-1")},
|
||||
{ID: kubecontainer.BuildContainerID("rkt", "id-2")},
|
||||
{
|
||||
ID: kubecontainer.ParseContainerID("rkt://uuid:container-name-4"),
|
||||
Name: "container-name-4",
|
||||
},
|
||||
{
|
||||
ID: kubecontainer.ParseContainerID("rkt://uuid:container-name-3"),
|
||||
Name: "container-name-3",
|
||||
},
|
||||
{
|
||||
ID: kubecontainer.ParseContainerID("rkt://uuid:container-name-2"),
|
||||
Name: "container-name-2",
|
||||
},
|
||||
{
|
||||
ID: kubecontainer.ParseContainerID("rkt://uuid:container-name-1"),
|
||||
Name: "container-name-1",
|
||||
},
|
||||
},
|
||||
},
|
||||
[]string{
|
||||
"exec on pod: pod-1_ns-1(uid-1), container: container-name-1: rkt://id-1",
|
||||
"http-get on pod: pod-1_ns-1(uid-1), container: container-name-2: rkt://id-2",
|
||||
"exec on pod: pod-1_ns-1(uid-1), container: container-name-1: rkt://uuid:container-name-1",
|
||||
"http-get on pod: pod-1_ns-1(uid-1), container: container-name-2: rkt://uuid:container-name-2",
|
||||
},
|
||||
[]string{
|
||||
"exec on pod: pod-1_ns-1(uid-1), container: container-name-3: rkt://uuid:container-name-3",
|
||||
"http-get on pod: pod-1_ns-1(uid-1), container: container-name-4: rkt://uuid:container-name-4",
|
||||
},
|
||||
nil,
|
||||
},
|
||||
|
@ -1298,6 +1334,7 @@ func TestPreStopHooks(t *testing.T) {
|
|||
{
|
||||
Name: "container-name-1",
|
||||
Lifecycle: &api.Lifecycle{
|
||||
PostStart: &api.Handler{},
|
||||
PreStop: &api.Handler{},
|
||||
},
|
||||
},
|
||||
|
@ -1306,9 +1343,13 @@ func TestPreStopHooks(t *testing.T) {
|
|||
},
|
||||
&kubecontainer.Pod{
|
||||
Containers: []*kubecontainer.Container{
|
||||
{ID: kubecontainer.BuildContainerID("rkt", "id-1")},
|
||||
{
|
||||
ID: kubecontainer.ParseContainerID("rkt://uuid:container-name-1"),
|
||||
Name: "container-name-1",
|
||||
},
|
||||
},
|
||||
},
|
||||
[]string{},
|
||||
[]string{},
|
||||
errors.NewAggregate([]error{fmt.Errorf("Invalid handler: %v", &api.Handler{})}),
|
||||
},
|
||||
|
@ -1317,8 +1358,28 @@ func TestPreStopHooks(t *testing.T) {
|
|||
for i, tt := range tests {
|
||||
testCaseHint := fmt.Sprintf("test case #%d", i)
|
||||
|
||||
pod := &rktapi.Pod{Id: "uuid"}
|
||||
for _, c := range tt.runtimePod.Containers {
|
||||
pod.Apps = append(pod.Apps, &rktapi.App{
|
||||
Name: c.Name,
|
||||
State: rktapi.AppState_APP_STATE_RUNNING,
|
||||
})
|
||||
}
|
||||
fr.pods = []*rktapi.Pod{pod}
|
||||
|
||||
// Run post-start hooks
|
||||
err := rkt.runLifecycleHooks(tt.pod, tt.runtimePod, lifecyclePostStartHook)
|
||||
assert.Equal(t, tt.err, err, testCaseHint)
|
||||
|
||||
sort.Sort(sortedStringList(tt.postStartRuns))
|
||||
sort.Sort(sortedStringList(runner.HandlerRuns))
|
||||
|
||||
assert.Equal(t, tt.postStartRuns, runner.HandlerRuns, testCaseHint)
|
||||
|
||||
runner.Reset()
|
||||
|
||||
// Run pre-stop hooks.
|
||||
err := rkt.runPreStopHook(tt.pod, tt.runtimePod)
|
||||
err = rkt.runLifecycleHooks(tt.pod, tt.runtimePod, lifecyclePreStopHook)
|
||||
assert.Equal(t, tt.err, err, testCaseHint)
|
||||
|
||||
sort.Sort(sortedStringList(tt.preStopRuns))
|
||||
|
|
Loading…
Reference in New Issue