diff --git a/hack/test-cmd.sh b/hack/test-cmd.sh index d41bb481d0..77f6b65d3d 100755 --- a/hack/test-cmd.sh +++ b/hack/test-cmd.sh @@ -121,10 +121,10 @@ for version in "${kube_api_versions[@]}"; do kubectl get pods "${kube_flags[@]}" kubectl get pod redis-master "${kube_flags[@]}" [ "$(kubectl get pod redis-master -o template --output-version=v1beta1 -t '{{ .id }}' "${kube_flags[@]}")" == "redis-master" ] - output_pod=$(kubectl get pod redis-master -o json --output-version=v1beta1 "${kube_flags[@]}") + output_pod=$(kubectl get pod redis-master -o yaml --output-version=v1beta1 "${kube_flags[@]}") kubectl delete pod redis-master "${kube_flags[@]}" before="$(kubectl get pods -o template -t "{{ len .items }}" "${kube_flags[@]}")" - echo $output_pod | kubectl create -f - "${kube_flags[@]}" + echo "${output_pod}" | kubectl create -f - "${kube_flags[@]}" after="$(kubectl get pods -o template -t "{{ len .items }}" "${kube_flags[@]}")" [ "$((${after} - ${before}))" -eq 1 ] kubectl get pods -o yaml --output-version=v1beta1 "${kube_flags[@]}" | grep -q "id: redis-master" @@ -135,6 +135,10 @@ for version in "${kube_api_versions[@]}"; do kubectl get services "${kube_flags[@]}" kubectl create -f examples/guestbook/frontend-service.json "${kube_flags[@]}" kubectl get services "${kube_flags[@]}" + output_service=$(kubectl get service frontend -o json --output-version=v1beta3 "${kube_flags[@]}") + kubectl delete service frontend "${kube_flags[@]}" + echo "${output_service}" | kubectl create -f - "${kube_flags[@]}" + kubectl get services "${kube_flags[@]}" kubectl delete service frontend "${kube_flags[@]}" kube::log::status "Testing kubectl(${version}:replicationcontrollers)" diff --git a/pkg/kubectl/resource/builder_test.go b/pkg/kubectl/resource/builder_test.go index eeb59736e1..5be98be94d 100644 --- a/pkg/kubectl/resource/builder_test.go +++ b/pkg/kubectl/resource/builder_test.go @@ -25,6 +25,8 @@ import ( "reflect" "testing" + "github.com/ghodss/yaml" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/meta" @@ -115,6 +117,26 @@ func streamTestData() (io.Reader, *api.PodList, *api.ServiceList) { return r, pods, svc } +func JSONToYAMLOrDie(in []byte) []byte { + data, err := yaml.JSONToYAML(in) + if err != nil { + panic(err) + } + return data +} + +func streamYAMLTestData() (io.Reader, *api.PodList, *api.ServiceList) { + pods, svc := testData() + r, w := io.Pipe() + go func() { + defer w.Close() + w.Write(JSONToYAMLOrDie([]byte(runtime.EncodeOrDie(latest.Codec, pods)))) + w.Write([]byte("\n---\n")) + w.Write(JSONToYAMLOrDie([]byte(runtime.EncodeOrDie(latest.Codec, svc)))) + }() + return r, pods, svc +} + type testVisitor struct { InjectErr error Infos []*Info @@ -370,6 +392,23 @@ func TestStream(t *testing.T) { } } +func TestYAMLStream(t *testing.T) { + r, pods, rc := streamYAMLTestData() + b := NewBuilder(latest.RESTMapper, api.Scheme, fakeClient()). + NamespaceParam("test").Stream(r, "STDIN").Flatten() + + test := &testVisitor{} + singular := false + + err := b.Do().IntoSingular(&singular).Visit(test.Handle) + if err != nil || singular || len(test.Infos) != 3 { + t.Fatalf("unexpected response: %v %f %#v", err, singular, test.Infos) + } + if !reflect.DeepEqual([]runtime.Object{&pods.Items[0], &pods.Items[1], &rc.Items[0]}, test.Objects()) { + t.Errorf("unexpected visited objects: %#v", test.Objects()) + } +} + func TestMultipleObject(t *testing.T) { r, pods, svc := streamTestData() obj, err := NewBuilder(latest.RESTMapper, api.Scheme, fakeClient()). diff --git a/pkg/kubectl/resource/visitor.go b/pkg/kubectl/resource/visitor.go index 4d820a138e..7098ee8d21 100644 --- a/pkg/kubectl/resource/visitor.go +++ b/pkg/kubectl/resource/visitor.go @@ -17,7 +17,6 @@ limitations under the License. package resource import ( - "encoding/json" "fmt" "io" "io/ioutil" @@ -31,6 +30,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api/meta" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/util/errors" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util/yaml" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) @@ -344,7 +344,7 @@ func NewStreamVisitor(r io.Reader, mapper *Mapper, source string, ignoreErrors b // Visit implements Visitor over a stream. func (v *StreamVisitor) Visit(fn VisitorFunc) error { - d := json.NewDecoder(v.Reader) + d := yaml.NewYAMLOrJSONDecoder(v.Reader, 4096) for { ext := runtime.RawExtension{} if err := d.Decode(&ext); err != nil { diff --git a/pkg/util/yaml/decoder.go b/pkg/util/yaml/decoder.go new file mode 100644 index 0000000000..ccab145759 --- /dev/null +++ b/pkg/util/yaml/decoder.go @@ -0,0 +1,154 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package yaml + +import ( + "bufio" + "bytes" + "encoding/json" + "io" + "unicode" + + "github.com/ghodss/yaml" + "github.com/golang/glog" +) + +// YAMLToJSONDecoder decodes YAML documents from an io.Reader by +// separating individual documents. It first converts the YAML +// body to JSON, then unmarshals the JSON. +type YAMLToJSONDecoder struct { + scanner *bufio.Scanner +} + +// NewYAMLToJSONDecoder decodes YAML documents from the provided +// stream in chunks by converting each document (as defined by +// the YAML spec) into its own chunk, converting it to JSON via +// yaml.YAMLToJSON, and then passing it to json.Decoder. +func NewYAMLToJSONDecoder(r io.Reader) *YAMLToJSONDecoder { + scanner := bufio.NewScanner(r) + scanner.Split(splitYAMLDocument) + return &YAMLToJSONDecoder{ + scanner: scanner, + } +} + +// Decode reads a YAML document as JSON from the stream or returns +// an error. The decoding rules match json.Unmarshal, not +// yaml.Unmarshal. +func (d *YAMLToJSONDecoder) Decode(into interface{}) error { + if d.scanner.Scan() { + data, err := yaml.YAMLToJSON(d.scanner.Bytes()) + if err != nil { + return err + } + return json.Unmarshal(data, into) + } + err := d.scanner.Err() + if err == nil { + err = io.EOF + } + return err +} + +const yamlSeparator = "\n---" + +// splitYAMLDocument is a bufio.SplitFunc for splitting YAML streams into individual documents. +func splitYAMLDocument(data []byte, atEOF bool) (advance int, token []byte, err error) { + if atEOF && len(data) == 0 { + return 0, nil, nil + } + sep := len([]byte(yamlSeparator)) + if i := bytes.Index(data, []byte(yamlSeparator)); i >= 0 { + // We have a potential document terminator + i += sep + after := data[i:] + if len(after) == 0 { + // we can't read any more characters + if atEOF { + return len(data), data[:len(data)-sep], nil + } + return 0, nil, nil + } + if j := bytes.IndexByte(after, '\n'); j >= 0 { + return i + j + 1, data[0 : i-sep], nil + } + return 0, nil, nil + } + // If we're at EOF, we have a final, non-terminated line. Return it. + if atEOF { + return len(data), data, nil + } + // Request more data. + return 0, nil, nil +} + +// decoder is a convenience interface for Decode. +type decoder interface { + Decode(into interface{}) error +} + +// YAMLOrJSONDecoder attempts to decode a stream of JSON documents or +// YAML documents by sniffing for a leading { character. +type YAMLOrJSONDecoder struct { + r io.Reader + bufferSize int + + decoder decoder +} + +// NewYAMLOrJSONDecoder returns a decoder that will process YAML documents +// or JSON documents from the given reader as a stream. bufferSize determines +// how far into the stream the decoder will look to figure out whether this +// is a JSON stream (has whitespace followed by an open brace). +func NewYAMLOrJSONDecoder(r io.Reader, bufferSize int) *YAMLOrJSONDecoder { + return &YAMLOrJSONDecoder{ + r: r, + bufferSize: bufferSize, + } +} + +// Decode unmarshals the next object from the underlying stream into the +// provide object, or returns an error. +func (d *YAMLOrJSONDecoder) Decode(into interface{}) error { + if d.decoder == nil { + buffer, isJSON := guessJSONStream(d.r, d.bufferSize) + if isJSON { + glog.V(4).Infof("decoding stream as JSON") + d.decoder = json.NewDecoder(buffer) + } else { + glog.V(4).Infof("decoding stream as YAML") + d.decoder = NewYAMLToJSONDecoder(buffer) + } + } + return d.decoder.Decode(into) +} + +// guessJSONStream scans the provided reader up to size, looking +// for an open brace indicating this is JSON. It will return the +// bufio.Reader it creates for the consumer. +func guessJSONStream(r io.Reader, size int) (io.Reader, bool) { + buffer := bufio.NewReaderSize(r, size) + b, _ := buffer.Peek(size) + return buffer, hasPrefix(b, []byte("{")) +} + +// Return true if the first non-whitespace bytes in buf is +// prefix +func hasPrefix(buf []byte, prefix []byte) bool { + buf = bytes.TrimLeftFunc(buf, unicode.IsSpace) + return bytes.HasPrefix(buf, prefix) +} diff --git a/pkg/util/yaml/decoder_test.go b/pkg/util/yaml/decoder_test.go new file mode 100644 index 0000000000..c590b5452f --- /dev/null +++ b/pkg/util/yaml/decoder_test.go @@ -0,0 +1,199 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package yaml + +import ( + "bufio" + "bytes" + "encoding/json" + "fmt" + "io" + "testing" +) + +func TestSplitYAMLDocument(t *testing.T) { + testCases := []struct { + input string + atEOF bool + expect string + adv int + }{ + {"foo", true, "foo", 3}, + {"fo", false, "", 0}, + + {"---", true, "---", 3}, + {"---\n", true, "---\n", 4}, + {"---\n", false, "", 0}, + + {"\n---\n", false, "", 5}, + {"\n---\n", true, "", 5}, + + {"abc\n---\ndef", true, "abc", 8}, + {"def", true, "def", 3}, + {"", true, "", 0}, + } + for i, testCase := range testCases { + adv, token, err := splitYAMLDocument([]byte(testCase.input), testCase.atEOF) + if err != nil { + t.Errorf("%d: unexpected error: %v", i, err) + continue + } + if adv != testCase.adv { + t.Errorf("%d: advance did not match: %d %d", i, testCase.adv, adv) + } + if testCase.expect != string(token) { + t.Errorf("%d: token did not match: %q %q", i, testCase.expect, string(token)) + } + } +} + +func TestScanYAML(t *testing.T) { + s := bufio.NewScanner(bytes.NewReader([]byte(`--- +stuff: 1 + +--- + `))) + s.Split(splitYAMLDocument) + if !s.Scan() { + t.Fatalf("should have been able to scan") + } + t.Logf("scan: %s", s.Text()) + if !s.Scan() { + t.Fatalf("should have been able to scan") + } + t.Logf("scan: %s", s.Text()) + if s.Scan() { + t.Fatalf("scan should have been done") + } + if s.Err() != nil { + t.Fatalf("err should have been nil: %v", s.Err()) + } +} + +func TestDecodeYAML(t *testing.T) { + s := NewYAMLToJSONDecoder(bytes.NewReader([]byte(`--- +stuff: 1 + +--- + `))) + obj := generic{} + if err := s.Decode(&obj); err != nil { + t.Fatalf("unexpected error: %v", err) + } + if fmt.Sprintf("%#v", obj) != `yaml.generic{"stuff":1}` { + t.Errorf("unexpected object: %#v", obj) + } + obj = generic{} + if err := s.Decode(&obj); err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(obj) != 0 { + t.Fatalf("unexpected object: %#v", obj) + } + obj = generic{} + if err := s.Decode(&obj); err != io.EOF { + t.Fatalf("unexpected error: %v", err) + } +} + +type generic map[string]interface{} + +func TestYAMLOrJSONDecoder(t *testing.T) { + testCases := []struct { + input string + buffer int + isJSON bool + err bool + out []generic + }{ + {` {"1":2}{"3":4}`, 2, true, false, []generic{ + {"1": 2}, + {"3": 4}, + }}, + {" \n{}", 3, true, false, []generic{ + {}, + }}, + {" \na: b", 2, false, false, []generic{ + {"a": "b"}, + }}, + {` \n{"a": "b"}`, 2, false, true, []generic{ + {"a": "b"}, + }}, + {` {"a":"b"}`, 100, true, false, []generic{ + {"a": "b"}, + }}, + {"", 1, false, false, []generic{}}, + {"foo: bar\n---\nbaz: biz", 100, false, false, []generic{ + {"foo": "bar"}, + {"baz": "biz"}, + }}, + {"foo: bar\n---\n", 100, false, false, []generic{ + {"foo": "bar"}, + }}, + {"foo: bar\n---", 100, false, false, []generic{ + {"foo": "bar"}, + }}, + {"foo: bar\n--", 100, false, true, []generic{ + {"foo": "bar"}, + }}, + {"foo: bar\n-", 100, false, true, []generic{ + {"foo": "bar"}, + }}, + {"foo: bar\n", 100, false, false, []generic{ + {"foo": "bar"}, + }}, + } + for i, testCase := range testCases { + decoder := NewYAMLOrJSONDecoder(bytes.NewReader([]byte(testCase.input)), testCase.buffer) + objs := []generic{} + + var err error + for { + out := make(generic) + err = decoder.Decode(&out) + if err != nil { + break + } + objs = append(objs, out) + } + if err != io.EOF { + switch { + case testCase.err && err == nil: + t.Errorf("%d: unexpected non-error", i) + continue + case !testCase.err && err != nil: + t.Errorf("%d: unexpected error: %v", i, err) + continue + case err != nil: + continue + } + } + switch decoder.decoder.(type) { + case *YAMLToJSONDecoder: + if testCase.isJSON { + t.Errorf("%d: expected JSON decoder, got YAML", i) + } + case *json.Decoder: + if !testCase.isJSON { + t.Errorf("%d: expected YAML decoder, got JSON", i) + } + } + if fmt.Sprintf("%#v", testCase.out) != fmt.Sprintf("%#v", objs) { + t.Errorf("%d: objects were not equal: \n%#v\n%#v", i, testCase.out, objs) + } + } +}