mirror of https://github.com/k3s-io/k3s
Fix `kubectl run` watches in waitForPod
parent
ef5ba5cc8b
commit
2e7242aac8
|
@ -29,6 +29,7 @@ import (
|
|||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/fields"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
utilerrors "k8s.io/apimachinery/pkg/util/errors"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
|
@ -37,6 +38,7 @@ import (
|
|||
"k8s.io/client-go/dynamic"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
watchtools "k8s.io/client-go/tools/watch"
|
||||
"k8s.io/kubernetes/pkg/kubectl"
|
||||
"k8s.io/kubernetes/pkg/kubectl/cmd/attach"
|
||||
|
@ -490,18 +492,41 @@ func (o *RunOptions) removeCreatedObjects(f cmdutil.Factory, createdObjects []*R
|
|||
|
||||
// waitForPod watches the given pod until the exitCondition is true
|
||||
func waitForPod(podClient corev1client.PodsGetter, ns, name string, exitCondition watchtools.ConditionFunc) (*corev1.Pod, error) {
|
||||
w, err := podClient.Pods(ns).Watch(metav1.SingleObject(metav1.ObjectMeta{Name: name}))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// TODO: expose the timeout
|
||||
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), 0*time.Second)
|
||||
defer cancel()
|
||||
|
||||
preconditionFunc := func(store cache.Store) (bool, error) {
|
||||
_, exists, err := store.Get(&metav1.ObjectMeta{Namespace: ns, Name: name})
|
||||
if err != nil {
|
||||
return true, err
|
||||
}
|
||||
if !exists {
|
||||
// We need to make sure we see the object in the cache before we start waiting for events
|
||||
// or we would be waiting for the timeout if such object didn't exist.
|
||||
// (e.g. it was deleted before we started informers so they wouldn't even see the delete event)
|
||||
return true, errors.NewNotFound(corev1.Resource("pods"), name)
|
||||
}
|
||||
|
||||
return false, nil
|
||||
}
|
||||
|
||||
fieldSelector := fields.OneTermEqualSelector("metadata.name", name).String()
|
||||
lw := &cache.ListWatch{
|
||||
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
|
||||
options.FieldSelector = fieldSelector
|
||||
return podClient.Pods(ns).List(options)
|
||||
},
|
||||
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
|
||||
options.FieldSelector = fieldSelector
|
||||
return podClient.Pods(ns).Watch(options)
|
||||
},
|
||||
}
|
||||
|
||||
intr := interrupt.New(nil, cancel)
|
||||
var result *corev1.Pod
|
||||
err = intr.Run(func() error {
|
||||
ev, err := watchtools.UntilWithoutRetry(ctx, w, func(ev watch.Event) (bool, error) {
|
||||
err := intr.Run(func() error {
|
||||
ev, err := watchtools.UntilWithSync(ctx, lw, &corev1.Pod{}, preconditionFunc, func(ev watch.Event) (bool, error) {
|
||||
return exitCondition(ev)
|
||||
})
|
||||
if ev != nil {
|
||||
|
@ -510,11 +535,6 @@ func waitForPod(podClient corev1client.PodsGetter, ns, name string, exitConditio
|
|||
return err
|
||||
})
|
||||
|
||||
// Fix generic not found error.
|
||||
if err != nil && errors.IsNotFound(err) {
|
||||
err = errors.NewNotFound(corev1.Resource("pods"), name)
|
||||
}
|
||||
|
||||
return result, err
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue