Merge pull request #27392 from sjenning/old-kubectl-watch

Automatic merge from submit-queue

allow watching old resources with kubectl

Right now, one can not watch a resource with kubectl whose resourceVersion is outside the etcd watch window.  Specifying resourceVersion=0 returns the current object, then watches from the current index.

This PR changes the logic to use resourceVersion=0, which will work regardless of the resourceVersion of the object, and discard the first event if --watch-only is specified.

@ncdc @aveshagarwal
pull/6/head
Kubernetes Submit Queue 2016-08-02 14:44:20 -07:00 committed by GitHub
commit f2a9ba2339
2 changed files with 115 additions and 16 deletions

View File

@ -182,9 +182,20 @@ func RunGet(f *cmdutil.Factory, out io.Writer, cmd *cobra.Command, args []string
if err != nil {
return err
}
rv, err := mapping.MetadataAccessor.ResourceVersion(obj)
if err != nil {
return err
// watching from resourceVersion 0, starts the watch at ~now and
// will return an initial watch event. Starting form ~now, rather
// the rv of the object will insure that we start the watch from
// inside the watch window, which the rv of the object might not be.
rv := "0"
isList := meta.IsListType(obj)
if isList {
// the resourceVersion of list objects is ~now but won't return
// an initial watch event
rv, err = mapping.MetadataAccessor.ResourceVersion(obj)
if err != nil {
return err
}
}
// print the current object
@ -200,7 +211,13 @@ func RunGet(f *cmdutil.Factory, out io.Writer, cmd *cobra.Command, args []string
return err
}
first := true
kubectl.WatchLoop(w, func(e watch.Event) error {
if !isList && first {
// drop the initial watch event in the single resource case
first = false
return nil
}
return printer.PrintObj(e.Object, out)
})
return nil

View File

@ -672,6 +672,14 @@ func TestGetByNameForcesFlag(t *testing.T) {
func watchTestData() ([]api.Pod, []watch.Event) {
pods := []api.Pod{
{
ObjectMeta: api.ObjectMeta{
Name: "bar",
Namespace: "test",
ResourceVersion: "9",
},
Spec: apitesting.DeepEqualSafePodSpec(),
},
{
ObjectMeta: api.ObjectMeta{
Name: "foo",
@ -682,6 +690,30 @@ func watchTestData() ([]api.Pod, []watch.Event) {
},
}
events := []watch.Event{
// current state events
{
Type: watch.Added,
Object: &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "bar",
Namespace: "test",
ResourceVersion: "9",
},
Spec: apitesting.DeepEqualSafePodSpec(),
},
},
{
Type: watch.Added,
Object: &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "foo",
Namespace: "test",
ResourceVersion: "10",
},
Spec: apitesting.DeepEqualSafePodSpec(),
},
},
// resource events
{
Type: watch.Modified,
Object: &api.Pod{
@ -713,6 +745,12 @@ func TestWatchSelector(t *testing.T) {
f, tf, codec, ns := NewAPIFactory()
tf.Printer = &testPrinter{}
podList := &api.PodList{
Items: pods,
ListMeta: unversioned.ListMeta{
ResourceVersion: "10",
},
}
tf.Client = &fake.RESTClient{
NegotiatedSerializer: ns,
Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) {
@ -721,9 +759,9 @@ func TestWatchSelector(t *testing.T) {
}
switch req.URL.Path {
case "/namespaces/test/pods":
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(codec, &api.PodList{Items: pods})}, nil
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(codec, podList)}, nil
case "/watch/namespaces/test/pods":
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: watchBody(codec, events)}, nil
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: watchBody(codec, events[2:])}, nil
default:
t.Fatalf("unexpected request: %#v\n%#v", req.URL, req)
return nil, nil
@ -740,10 +778,10 @@ func TestWatchSelector(t *testing.T) {
cmd.Flags().Set("selector", "a=b")
cmd.Run(cmd, []string{"pods"})
expected := []runtime.Object{&api.PodList{Items: pods}, events[0].Object, events[1].Object}
expected := []runtime.Object{podList, events[2].Object, events[3].Object}
actual := tf.Printer.(*testPrinter).Objects
if !reflect.DeepEqual(expected, actual) {
t.Errorf("unexpected object:\nExpected: %#v\n\nGot: %#v\n\n", expected[0], actual[0])
t.Errorf("unexpected object:\nExpected: %#v\n\nGot: %#v\n\n", expected, actual)
}
if len(buf.String()) == 0 {
t.Errorf("unexpected empty output")
@ -760,9 +798,9 @@ func TestWatchResource(t *testing.T) {
Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) {
switch req.URL.Path {
case "/namespaces/test/pods/foo":
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(codec, &pods[0])}, nil
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(codec, &pods[1])}, nil
case "/watch/namespaces/test/pods/foo":
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: watchBody(codec, events)}, nil
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: watchBody(codec, events[1:])}, nil
default:
t.Fatalf("unexpected request: %#v\n%#v", req.URL, req)
return nil, nil
@ -778,7 +816,7 @@ func TestWatchResource(t *testing.T) {
cmd.Flags().Set("watch", "true")
cmd.Run(cmd, []string{"pods", "foo"})
expected := []runtime.Object{&pods[0], events[0].Object, events[1].Object}
expected := []runtime.Object{&pods[1], events[2].Object, events[3].Object}
actual := tf.Printer.(*testPrinter).Objects
if !reflect.DeepEqual(expected, actual) {
t.Errorf("unexpected object:\nExpected: %#v\n\nGot: %#v\n\n", expected, actual)
@ -798,9 +836,9 @@ func TestWatchResourceIdentifiedByFile(t *testing.T) {
Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) {
switch req.URL.Path {
case "/namespaces/test/replicationcontrollers/cassandra":
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(codec, &pods[0])}, nil
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(codec, &pods[1])}, nil
case "/watch/namespaces/test/replicationcontrollers/cassandra":
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: watchBody(codec, events)}, nil
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: watchBody(codec, events[1:])}, nil
default:
t.Fatalf("unexpected request: %#v\n%#v", req.URL, req)
return nil, nil
@ -816,7 +854,7 @@ func TestWatchResourceIdentifiedByFile(t *testing.T) {
cmd.Flags().Set("filename", "../../../examples/storage/cassandra/cassandra-controller.yaml")
cmd.Run(cmd, []string{})
expected := []runtime.Object{&pods[0], events[0].Object, events[1].Object}
expected := []runtime.Object{&pods[1], events[2].Object, events[3].Object}
actual := tf.Printer.(*testPrinter).Objects
if !reflect.DeepEqual(expected, actual) {
t.Errorf("expected object: %#v unexpected object: %#v", expected, actual)
@ -837,9 +875,9 @@ func TestWatchOnlyResource(t *testing.T) {
Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) {
switch req.URL.Path {
case "/namespaces/test/pods/foo":
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(codec, &pods[0])}, nil
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(codec, &pods[1])}, nil
case "/watch/namespaces/test/pods/foo":
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: watchBody(codec, events)}, nil
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: watchBody(codec, events[1:])}, nil
default:
t.Fatalf("unexpected request: %#v\n%#v", req.URL, req)
return nil, nil
@ -855,7 +893,51 @@ func TestWatchOnlyResource(t *testing.T) {
cmd.Flags().Set("watch-only", "true")
cmd.Run(cmd, []string{"pods", "foo"})
expected := []runtime.Object{events[0].Object, events[1].Object}
expected := []runtime.Object{events[2].Object, events[3].Object}
actual := tf.Printer.(*testPrinter).Objects
if !reflect.DeepEqual(expected, actual) {
t.Errorf("unexpected object: %#v", actual)
}
if len(buf.String()) == 0 {
t.Errorf("unexpected empty output")
}
}
func TestWatchOnlyList(t *testing.T) {
pods, events := watchTestData()
f, tf, codec, ns := NewAPIFactory()
tf.Printer = &testPrinter{}
podList := &api.PodList{
Items: pods,
ListMeta: unversioned.ListMeta{
ResourceVersion: "10",
},
}
tf.Client = &fake.RESTClient{
NegotiatedSerializer: ns,
Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) {
switch req.URL.Path {
case "/namespaces/test/pods":
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(codec, podList)}, nil
case "/watch/namespaces/test/pods":
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: watchBody(codec, events[2:])}, nil
default:
t.Fatalf("unexpected request: %#v\n%#v", req.URL, req)
return nil, nil
}
}),
}
tf.Namespace = "test"
buf := bytes.NewBuffer([]byte{})
cmd := NewCmdGet(f, buf)
cmd.SetOutput(buf)
cmd.Flags().Set("watch-only", "true")
cmd.Run(cmd, []string{"pods"})
expected := []runtime.Object{events[2].Object, events[3].Object}
actual := tf.Printer.(*testPrinter).Objects
if !reflect.DeepEqual(expected, actual) {
t.Errorf("unexpected object: %#v", actual)