Merge pull request #75096 from tnozicka/fix-kubectl-run-watches

Fix kubectl run watches
pull/564/head
Kubernetes Prow Robot 2019-03-07 22:44:18 -08:00 committed by GitHub
commit 2e64704425
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 34 additions and 12 deletions

View File

@ -23,6 +23,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/meta:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/meta:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
@ -31,6 +32,7 @@ go_library(
"//staging/src/k8s.io/client-go/dynamic:go_default_library", "//staging/src/k8s.io/client-go/dynamic:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/tools/watch:go_default_library", "//staging/src/k8s.io/client-go/tools/watch:go_default_library",
"//vendor/github.com/docker/distribution/reference:go_default_library", "//vendor/github.com/docker/distribution/reference:go_default_library",
"//vendor/github.com/spf13/cobra:go_default_library", "//vendor/github.com/spf13/cobra:go_default_library",

View File

@ -29,6 +29,7 @@ import (
"k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
utilerrors "k8s.io/apimachinery/pkg/util/errors" utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
@ -37,6 +38,7 @@ import (
"k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1" corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
watchtools "k8s.io/client-go/tools/watch" watchtools "k8s.io/client-go/tools/watch"
"k8s.io/kubernetes/pkg/kubectl" "k8s.io/kubernetes/pkg/kubectl"
"k8s.io/kubernetes/pkg/kubectl/cmd/attach" "k8s.io/kubernetes/pkg/kubectl/cmd/attach"
@ -490,18 +492,41 @@ func (o *RunOptions) removeCreatedObjects(f cmdutil.Factory, createdObjects []*R
// waitForPod watches the given pod until the exitCondition is true // waitForPod watches the given pod until the exitCondition is true
func waitForPod(podClient corev1client.PodsGetter, ns, name string, exitCondition watchtools.ConditionFunc) (*corev1.Pod, error) { func waitForPod(podClient corev1client.PodsGetter, ns, name string, exitCondition watchtools.ConditionFunc) (*corev1.Pod, error) {
w, err := podClient.Pods(ns).Watch(metav1.SingleObject(metav1.ObjectMeta{Name: name}))
if err != nil {
return nil, err
}
// TODO: expose the timeout // TODO: expose the timeout
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), 0*time.Second) ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), 0*time.Second)
defer cancel() defer cancel()
preconditionFunc := func(store cache.Store) (bool, error) {
_, exists, err := store.Get(&metav1.ObjectMeta{Namespace: ns, Name: name})
if err != nil {
return true, err
}
if !exists {
// We need to make sure we see the object in the cache before we start waiting for events
// or we would be waiting for the timeout if such object didn't exist.
// (e.g. it was deleted before we started informers so they wouldn't even see the delete event)
return true, errors.NewNotFound(corev1.Resource("pods"), name)
}
return false, nil
}
fieldSelector := fields.OneTermEqualSelector("metadata.name", name).String()
lw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.FieldSelector = fieldSelector
return podClient.Pods(ns).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.FieldSelector = fieldSelector
return podClient.Pods(ns).Watch(options)
},
}
intr := interrupt.New(nil, cancel) intr := interrupt.New(nil, cancel)
var result *corev1.Pod var result *corev1.Pod
err = intr.Run(func() error { err := intr.Run(func() error {
ev, err := watchtools.UntilWithoutRetry(ctx, w, func(ev watch.Event) (bool, error) { ev, err := watchtools.UntilWithSync(ctx, lw, &corev1.Pod{}, preconditionFunc, func(ev watch.Event) (bool, error) {
return exitCondition(ev) return exitCondition(ev)
}) })
if ev != nil { if ev != nil {
@ -510,11 +535,6 @@ func waitForPod(podClient corev1client.PodsGetter, ns, name string, exitConditio
return err return err
}) })
// Fix generic not found error.
if err != nil && errors.IsNotFound(err) {
err = errors.NewNotFound(corev1.Resource("pods"), name)
}
return result, err return result, err
} }