Switch kubectl to use watch.Until

pull/6/head
bindata-mockuser 2016-08-08 19:56:19 +02:00 committed by Michail Kargakis
parent 158dc1a863
commit 5ad518cd2b
7 changed files with 116 additions and 130 deletions

View File

@ -357,8 +357,9 @@ var fieldMappings = versionToResourceToFieldMapping{
nodeUnschedulable: nodeUnschedulable,
},
"pods": clientFieldNameToAPIVersionFieldName{
podHost: podHost,
podStatus: podStatus,
objectNameField: objectNameField,
podHost: podHost,
podStatus: podStatus,
},
"secrets": clientFieldNameToAPIVersionFieldName{
secretType: secretType,

View File

@ -30,6 +30,7 @@ import (
"k8s.io/kubernetes/pkg/kubectl/resource"
"k8s.io/kubernetes/pkg/runtime"
utilerrors "k8s.io/kubernetes/pkg/util/errors"
"k8s.io/kubernetes/pkg/util/interrupt"
"k8s.io/kubernetes/pkg/watch"
)
@ -266,17 +267,22 @@ func RunGet(f *cmdutil.Factory, out io.Writer, errOut io.Writer, cmd *cobra.Comm
first := true
filteredResourceCount = 0
kubectl.WatchLoop(w, func(e watch.Event) error {
if !isList && first {
// drop the initial watch event in the single resource case
first = false
return nil
}
err := printer.PrintObj(e.Object, out)
if err == nil {
intr := interrupt.New(nil, w.Stop)
intr.Run(func() error {
_, err := watch.Until(0, w, func(e watch.Event) (bool, error) {
if !isList && first {
// drop the initial watch event in the single resource case
first = false
return false, nil
}
err := printer.PrintObj(e.Object, out)
if err != nil {
return false, err
}
filteredResourceCount++
cmdutil.PrintFilterCount(filteredResourceCount, mapping.Resource, errOut, filterOpts)
}
return false, nil
})
return err
})
return nil

View File

@ -24,6 +24,7 @@ import (
"k8s.io/kubernetes/pkg/kubectl"
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
"k8s.io/kubernetes/pkg/kubectl/resource"
"k8s.io/kubernetes/pkg/util/interrupt"
"k8s.io/kubernetes/pkg/watch"
"github.com/spf13/cobra"
@ -125,18 +126,22 @@ func RunStatus(f *cmdutil.Factory, cmd *cobra.Command, out io.Writer, args []str
}
// if the rollout isn't done yet, keep watching deployment status
kubectl.WatchLoop(w, func(e watch.Event) error {
// print deployment's status
status, done, err := statusViewer.Status(cmdNamespace, info.Name)
if err != nil {
return err
}
fmt.Fprintf(out, "%s", status)
// Quit waiting if the rollout is done
if done {
w.Stop()
}
return nil
intr := interrupt.New(nil, w.Stop)
intr.Run(func() error {
_, err := watch.Until(0, w, func(e watch.Event) (bool, error) {
// print deployment's status
status, done, err := statusViewer.Status(cmdNamespace, info.Name)
if err != nil {
return false, err
}
fmt.Fprintf(out, "%s", status)
// Quit waiting if the rollout is done
if done {
return true, nil
}
return false, nil
})
return err
})
return nil
}

View File

@ -28,16 +28,19 @@ import (
"github.com/docker/distribution/reference"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/meta"
"k8s.io/kubernetes/pkg/api/unversioned"
batchv1 "k8s.io/kubernetes/pkg/apis/batch/v1"
"k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned"
conditions "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/kubectl"
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
"k8s.io/kubernetes/pkg/kubectl/resource"
"k8s.io/kubernetes/pkg/runtime"
uexec "k8s.io/kubernetes/pkg/util/exec"
"k8s.io/kubernetes/pkg/util/interrupt"
"k8s.io/kubernetes/pkg/watch"
)
@ -372,90 +375,90 @@ func contains(resourcesList map[string]*unversioned.APIResourceList, resource un
// waitForPod watches the given pod until the exitCondition is true. Each two seconds
// the tick function is called e.g. for progress output.
func waitForPod(podClient coreclient.PodsGetter, ns, name string, exitCondition func(*api.Pod) bool, tick func(*api.Pod)) (*api.Pod, error) {
pod, err := podClient.Pods(ns).Get(name)
if err != nil {
return nil, err
}
if exitCondition(pod) {
return pod, nil
}
tick(pod)
w, err := podClient.Pods(ns).Watch(api.SingleObject(api.ObjectMeta{Name: pod.Name, ResourceVersion: pod.ResourceVersion}))
func waitForPod(podClient coreclient.PodsGetter, ns, name string, exitCondition watch.ConditionFunc, tick func(*api.Pod)) (*api.Pod, error) {
w, err := podClient.Pods(ns).Watch(api.SingleObject(api.ObjectMeta{Name: name}))
if err != nil {
return nil, err
}
t := time.NewTicker(2 * time.Second)
defer t.Stop()
pods := make(chan *api.Pod) // observed pods passed to the exitCondition
defer close(pods)
// wait for the first event, then start the 2 sec ticker and loop
go func() {
for range t.C {
tick(pod)
pod := <-pods
if pod == nil {
return
}
tick(pod)
t := time.NewTicker(2 * time.Second)
defer t.Stop()
for {
select {
case pod = <-pods:
if pod == nil {
return
}
case _, ok := <-t.C:
if !ok {
return
}
tick(pod)
}
}
}()
err = nil
result := pod
kubectl.WatchLoop(w, func(ev watch.Event) error {
switch ev.Type {
case watch.Added, watch.Modified:
pod = ev.Object.(*api.Pod)
if exitCondition(pod) {
result = pod
w.Stop()
intr := interrupt.New(nil, w.Stop)
var result *api.Pod
intr.Run(func() error {
ev, err := watch.Until(0, w, func(ev watch.Event) (bool, error) {
c, err := exitCondition(ev)
if c == false && err == nil {
pods <- ev.Object.(*api.Pod) // send to ticker
}
case watch.Deleted:
w.Stop()
case watch.Error:
result = nil
err = fmt.Errorf("failed to watch pod %s/%s", ns, name)
w.Stop()
}
return nil
return c, err
})
result = ev.Object.(*api.Pod)
return err
})
return result, err
}
func waitForPodRunning(podClient coreclient.PodsGetter, ns, name string, out io.Writer, quiet bool) (*api.Pod, error) {
exitCondition := func(pod *api.Pod) bool {
switch pod.Status.Phase {
case api.PodRunning:
for _, status := range pod.Status.ContainerStatuses {
if !status.Ready {
return false
}
}
return true
case api.PodSucceeded, api.PodFailed:
return true
default:
return false
}
}
return waitForPod(podClient, ns, name, exitCondition, func(pod *api.Pod) {
pod, err := waitForPod(podClient, ns, name, conditions.PodRunningAndReady, func(pod *api.Pod) {
if !quiet {
fmt.Fprintf(out, "Waiting for pod %s/%s to be running, status is %s, pod ready: false\n", pod.Namespace, pod.Name, pod.Status.Phase)
}
})
// fix generic not found error with empty name in PodRunningAndReady
if err != nil && errors.IsNotFound(err) {
return nil, errors.NewNotFound(api.Resource("pods"), name)
}
return pod, err
}
func waitForPodTerminated(podClient coreclient.PodsGetter, ns, name string, out io.Writer, quiet bool) (*api.Pod, error) {
exitCondition := func(pod *api.Pod) bool {
return pod.Status.Phase == api.PodSucceeded || pod.Status.Phase == api.PodFailed
}
return waitForPod(podClient, ns, name, exitCondition, func(pod *api.Pod) {
pod, err := waitForPod(podClient, ns, name, conditions.PodCompleted, func(pod *api.Pod) {
if !quiet {
fmt.Fprintf(out, "Waiting for pod %s/%s to terminate, status is %s\n", pod.Namespace, pod.Name, pod.Status.Phase)
}
})
// fix generic not found error with empty name in PodCompleted
if err != nil && errors.IsNotFound(err) {
return nil, errors.NewNotFound(api.Resource("pods"), name)
}
return pod, err
}
func handleAttachPod(f *cmdutil.Factory, podClient coreclient.PodsGetter, ns, name string, opts *AttachOptions, quiet bool) error {
pod, err := waitForPodRunning(podClient, ns, name, opts.Out, quiet)
if err != nil {
if err != nil && err != conditions.ErrPodCompleted {
return err
}
ctrName, err := opts.GetContainerName(pod)

View File

@ -1,45 +0,0 @@
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubectl
import (
"os"
"os/signal"
"k8s.io/kubernetes/pkg/watch"
)
// WatchLoop loops, passing events in w to fn.
// If user sends interrupt signal, shut down cleanly. Otherwise, never return.
func WatchLoop(w watch.Interface, fn func(watch.Event) error) {
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
defer signal.Stop(signals)
for {
select {
case event, ok := <-w.ResultChan():
if !ok {
return
}
if err := fn(event); err != nil {
w.Stop()
}
case <-signals:
w.Stop()
}
}
}

View File

@ -32,6 +32,7 @@ type ConditionFunc func(event Event) (bool, error)
// encountered. The first condition that returns an error terminates the watch (and the event is also returned).
// If no event has been received, the returned event will be nil.
// Conditions are satisfied sequentially so as to provide a useful primitive for higher level composition.
// A zero timeout means to wait forever.
func Until(timeout time.Duration, watcher Interface, conditions ...ConditionFunc) (*Event, error) {
ch := watcher.ResultChan()
defer watcher.Stop()
@ -40,7 +41,7 @@ func Until(timeout time.Duration, watcher Interface, conditions ...ConditionFunc
after = time.After(timeout)
} else {
ch := make(chan time.Time)
close(ch)
defer close(ch)
after = ch
}
var lastEvent *Event

View File

@ -23,7 +23,6 @@ import (
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/util/wait"
)
func TestUntil(t *testing.T) {
@ -83,17 +82,33 @@ func TestUntilMultipleConditions(t *testing.T) {
func TestUntilTimeout(t *testing.T) {
fw := NewFake()
go func() {
var obj *api.Pod
fw.Add(obj)
fw.Modify(obj)
}()
conditions := []ConditionFunc{
func(event Event) (bool, error) { return event.Type == Added, nil },
func(event Event) (bool, error) {
return event.Type == Added, nil
},
func(event Event) (bool, error) {
return event.Type == Modified, nil
},
}
timeout := time.Duration(0)
lastEvent, err := Until(timeout, fw, conditions...)
if err != wait.ErrWaitTimeout {
t.Fatalf("expected ErrWaitTimeout error, got %#v", err)
if err != nil {
t.Fatalf("expected nil error, got %#v", err)
}
if lastEvent != nil {
t.Fatalf("expected nil event, got %#v", lastEvent)
if lastEvent == nil {
t.Fatal("expected an event")
}
if lastEvent.Type != Modified {
t.Fatalf("expected MODIFIED event type, got %v", lastEvent.Type)
}
if got, isPod := lastEvent.Object.(*api.Pod); !isPod {
t.Fatalf("expected a pod event, got %#v", got)
}
}