Merge pull request #3451 from smarterclayton/fix_yaml_streaming

Fix streaming YAML from STDIN in kubectl
pull/6/head
Daniel Smith 2015-01-13 23:25:39 -08:00
commit ff908a0d72
5 changed files with 400 additions and 4 deletions

View File

@ -121,10 +121,10 @@ for version in "${kube_api_versions[@]}"; do
kubectl get pods "${kube_flags[@]}"
kubectl get pod redis-master "${kube_flags[@]}"
[ "$(kubectl get pod redis-master -o template --output-version=v1beta1 -t '{{ .id }}' "${kube_flags[@]}")" == "redis-master" ]
output_pod=$(kubectl get pod redis-master -o json --output-version=v1beta1 "${kube_flags[@]}")
output_pod=$(kubectl get pod redis-master -o yaml --output-version=v1beta1 "${kube_flags[@]}")
kubectl delete pod redis-master "${kube_flags[@]}"
before="$(kubectl get pods -o template -t "{{ len .items }}" "${kube_flags[@]}")"
echo $output_pod | kubectl create -f - "${kube_flags[@]}"
echo "${output_pod}" | kubectl create -f - "${kube_flags[@]}"
after="$(kubectl get pods -o template -t "{{ len .items }}" "${kube_flags[@]}")"
[ "$((${after} - ${before}))" -eq 1 ]
kubectl get pods -o yaml --output-version=v1beta1 "${kube_flags[@]}" | grep -q "id: redis-master"
@ -135,6 +135,10 @@ for version in "${kube_api_versions[@]}"; do
kubectl get services "${kube_flags[@]}"
kubectl create -f examples/guestbook/frontend-service.json "${kube_flags[@]}"
kubectl get services "${kube_flags[@]}"
output_service=$(kubectl get service frontend -o json --output-version=v1beta3 "${kube_flags[@]}")
kubectl delete service frontend "${kube_flags[@]}"
echo "${output_service}" | kubectl create -f - "${kube_flags[@]}"
kubectl get services "${kube_flags[@]}"
kubectl delete service frontend "${kube_flags[@]}"
kube::log::status "Testing kubectl(${version}:replicationcontrollers)"

View File

@ -25,6 +25,8 @@ import (
"reflect"
"testing"
"github.com/ghodss/yaml"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/meta"
@ -115,6 +117,26 @@ func streamTestData() (io.Reader, *api.PodList, *api.ServiceList) {
return r, pods, svc
}
func JSONToYAMLOrDie(in []byte) []byte {
data, err := yaml.JSONToYAML(in)
if err != nil {
panic(err)
}
return data
}
func streamYAMLTestData() (io.Reader, *api.PodList, *api.ServiceList) {
pods, svc := testData()
r, w := io.Pipe()
go func() {
defer w.Close()
w.Write(JSONToYAMLOrDie([]byte(runtime.EncodeOrDie(latest.Codec, pods))))
w.Write([]byte("\n---\n"))
w.Write(JSONToYAMLOrDie([]byte(runtime.EncodeOrDie(latest.Codec, svc))))
}()
return r, pods, svc
}
type testVisitor struct {
InjectErr error
Infos []*Info
@ -370,6 +392,23 @@ func TestStream(t *testing.T) {
}
}
func TestYAMLStream(t *testing.T) {
r, pods, rc := streamYAMLTestData()
b := NewBuilder(latest.RESTMapper, api.Scheme, fakeClient()).
NamespaceParam("test").Stream(r, "STDIN").Flatten()
test := &testVisitor{}
singular := false
err := b.Do().IntoSingular(&singular).Visit(test.Handle)
if err != nil || singular || len(test.Infos) != 3 {
t.Fatalf("unexpected response: %v %f %#v", err, singular, test.Infos)
}
if !reflect.DeepEqual([]runtime.Object{&pods.Items[0], &pods.Items[1], &rc.Items[0]}, test.Objects()) {
t.Errorf("unexpected visited objects: %#v", test.Objects())
}
}
func TestMultipleObject(t *testing.T) {
r, pods, svc := streamTestData()
obj, err := NewBuilder(latest.RESTMapper, api.Scheme, fakeClient()).

View File

@ -17,7 +17,6 @@ limitations under the License.
package resource
import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
@ -31,6 +30,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/meta"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/yaml"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
)
@ -344,7 +344,7 @@ func NewStreamVisitor(r io.Reader, mapper *Mapper, source string, ignoreErrors b
// Visit implements Visitor over a stream.
func (v *StreamVisitor) Visit(fn VisitorFunc) error {
d := json.NewDecoder(v.Reader)
d := yaml.NewYAMLOrJSONDecoder(v.Reader, 4096)
for {
ext := runtime.RawExtension{}
if err := d.Decode(&ext); err != nil {

154
pkg/util/yaml/decoder.go Normal file
View File

@ -0,0 +1,154 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package yaml
import (
"bufio"
"bytes"
"encoding/json"
"io"
"unicode"
"github.com/ghodss/yaml"
"github.com/golang/glog"
)
// YAMLToJSONDecoder decodes YAML documents from an io.Reader by
// separating individual documents. It first converts the YAML
// body to JSON, then unmarshals the JSON.
type YAMLToJSONDecoder struct {
scanner *bufio.Scanner
}
// NewYAMLToJSONDecoder decodes YAML documents from the provided
// stream in chunks by converting each document (as defined by
// the YAML spec) into its own chunk, converting it to JSON via
// yaml.YAMLToJSON, and then passing it to json.Decoder.
func NewYAMLToJSONDecoder(r io.Reader) *YAMLToJSONDecoder {
scanner := bufio.NewScanner(r)
scanner.Split(splitYAMLDocument)
return &YAMLToJSONDecoder{
scanner: scanner,
}
}
// Decode reads a YAML document as JSON from the stream or returns
// an error. The decoding rules match json.Unmarshal, not
// yaml.Unmarshal.
func (d *YAMLToJSONDecoder) Decode(into interface{}) error {
if d.scanner.Scan() {
data, err := yaml.YAMLToJSON(d.scanner.Bytes())
if err != nil {
return err
}
return json.Unmarshal(data, into)
}
err := d.scanner.Err()
if err == nil {
err = io.EOF
}
return err
}
const yamlSeparator = "\n---"
// splitYAMLDocument is a bufio.SplitFunc for splitting YAML streams into individual documents.
func splitYAMLDocument(data []byte, atEOF bool) (advance int, token []byte, err error) {
if atEOF && len(data) == 0 {
return 0, nil, nil
}
sep := len([]byte(yamlSeparator))
if i := bytes.Index(data, []byte(yamlSeparator)); i >= 0 {
// We have a potential document terminator
i += sep
after := data[i:]
if len(after) == 0 {
// we can't read any more characters
if atEOF {
return len(data), data[:len(data)-sep], nil
}
return 0, nil, nil
}
if j := bytes.IndexByte(after, '\n'); j >= 0 {
return i + j + 1, data[0 : i-sep], nil
}
return 0, nil, nil
}
// If we're at EOF, we have a final, non-terminated line. Return it.
if atEOF {
return len(data), data, nil
}
// Request more data.
return 0, nil, nil
}
// decoder is a convenience interface for Decode.
type decoder interface {
Decode(into interface{}) error
}
// YAMLOrJSONDecoder attempts to decode a stream of JSON documents or
// YAML documents by sniffing for a leading { character.
type YAMLOrJSONDecoder struct {
r io.Reader
bufferSize int
decoder decoder
}
// NewYAMLOrJSONDecoder returns a decoder that will process YAML documents
// or JSON documents from the given reader as a stream. bufferSize determines
// how far into the stream the decoder will look to figure out whether this
// is a JSON stream (has whitespace followed by an open brace).
func NewYAMLOrJSONDecoder(r io.Reader, bufferSize int) *YAMLOrJSONDecoder {
return &YAMLOrJSONDecoder{
r: r,
bufferSize: bufferSize,
}
}
// Decode unmarshals the next object from the underlying stream into the
// provide object, or returns an error.
func (d *YAMLOrJSONDecoder) Decode(into interface{}) error {
if d.decoder == nil {
buffer, isJSON := guessJSONStream(d.r, d.bufferSize)
if isJSON {
glog.V(4).Infof("decoding stream as JSON")
d.decoder = json.NewDecoder(buffer)
} else {
glog.V(4).Infof("decoding stream as YAML")
d.decoder = NewYAMLToJSONDecoder(buffer)
}
}
return d.decoder.Decode(into)
}
// guessJSONStream scans the provided reader up to size, looking
// for an open brace indicating this is JSON. It will return the
// bufio.Reader it creates for the consumer.
func guessJSONStream(r io.Reader, size int) (io.Reader, bool) {
buffer := bufio.NewReaderSize(r, size)
b, _ := buffer.Peek(size)
return buffer, hasPrefix(b, []byte("{"))
}
// Return true if the first non-whitespace bytes in buf is
// prefix
func hasPrefix(buf []byte, prefix []byte) bool {
buf = bytes.TrimLeftFunc(buf, unicode.IsSpace)
return bytes.HasPrefix(buf, prefix)
}

View File

@ -0,0 +1,199 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package yaml
import (
"bufio"
"bytes"
"encoding/json"
"fmt"
"io"
"testing"
)
func TestSplitYAMLDocument(t *testing.T) {
testCases := []struct {
input string
atEOF bool
expect string
adv int
}{
{"foo", true, "foo", 3},
{"fo", false, "", 0},
{"---", true, "---", 3},
{"---\n", true, "---\n", 4},
{"---\n", false, "", 0},
{"\n---\n", false, "", 5},
{"\n---\n", true, "", 5},
{"abc\n---\ndef", true, "abc", 8},
{"def", true, "def", 3},
{"", true, "", 0},
}
for i, testCase := range testCases {
adv, token, err := splitYAMLDocument([]byte(testCase.input), testCase.atEOF)
if err != nil {
t.Errorf("%d: unexpected error: %v", i, err)
continue
}
if adv != testCase.adv {
t.Errorf("%d: advance did not match: %d %d", i, testCase.adv, adv)
}
if testCase.expect != string(token) {
t.Errorf("%d: token did not match: %q %q", i, testCase.expect, string(token))
}
}
}
func TestScanYAML(t *testing.T) {
s := bufio.NewScanner(bytes.NewReader([]byte(`---
stuff: 1
---
`)))
s.Split(splitYAMLDocument)
if !s.Scan() {
t.Fatalf("should have been able to scan")
}
t.Logf("scan: %s", s.Text())
if !s.Scan() {
t.Fatalf("should have been able to scan")
}
t.Logf("scan: %s", s.Text())
if s.Scan() {
t.Fatalf("scan should have been done")
}
if s.Err() != nil {
t.Fatalf("err should have been nil: %v", s.Err())
}
}
func TestDecodeYAML(t *testing.T) {
s := NewYAMLToJSONDecoder(bytes.NewReader([]byte(`---
stuff: 1
---
`)))
obj := generic{}
if err := s.Decode(&obj); err != nil {
t.Fatalf("unexpected error: %v", err)
}
if fmt.Sprintf("%#v", obj) != `yaml.generic{"stuff":1}` {
t.Errorf("unexpected object: %#v", obj)
}
obj = generic{}
if err := s.Decode(&obj); err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(obj) != 0 {
t.Fatalf("unexpected object: %#v", obj)
}
obj = generic{}
if err := s.Decode(&obj); err != io.EOF {
t.Fatalf("unexpected error: %v", err)
}
}
type generic map[string]interface{}
func TestYAMLOrJSONDecoder(t *testing.T) {
testCases := []struct {
input string
buffer int
isJSON bool
err bool
out []generic
}{
{` {"1":2}{"3":4}`, 2, true, false, []generic{
{"1": 2},
{"3": 4},
}},
{" \n{}", 3, true, false, []generic{
{},
}},
{" \na: b", 2, false, false, []generic{
{"a": "b"},
}},
{` \n{"a": "b"}`, 2, false, true, []generic{
{"a": "b"},
}},
{` {"a":"b"}`, 100, true, false, []generic{
{"a": "b"},
}},
{"", 1, false, false, []generic{}},
{"foo: bar\n---\nbaz: biz", 100, false, false, []generic{
{"foo": "bar"},
{"baz": "biz"},
}},
{"foo: bar\n---\n", 100, false, false, []generic{
{"foo": "bar"},
}},
{"foo: bar\n---", 100, false, false, []generic{
{"foo": "bar"},
}},
{"foo: bar\n--", 100, false, true, []generic{
{"foo": "bar"},
}},
{"foo: bar\n-", 100, false, true, []generic{
{"foo": "bar"},
}},
{"foo: bar\n", 100, false, false, []generic{
{"foo": "bar"},
}},
}
for i, testCase := range testCases {
decoder := NewYAMLOrJSONDecoder(bytes.NewReader([]byte(testCase.input)), testCase.buffer)
objs := []generic{}
var err error
for {
out := make(generic)
err = decoder.Decode(&out)
if err != nil {
break
}
objs = append(objs, out)
}
if err != io.EOF {
switch {
case testCase.err && err == nil:
t.Errorf("%d: unexpected non-error", i)
continue
case !testCase.err && err != nil:
t.Errorf("%d: unexpected error: %v", i, err)
continue
case err != nil:
continue
}
}
switch decoder.decoder.(type) {
case *YAMLToJSONDecoder:
if testCase.isJSON {
t.Errorf("%d: expected JSON decoder, got YAML", i)
}
case *json.Decoder:
if !testCase.isJSON {
t.Errorf("%d: expected YAML decoder, got JSON", i)
}
}
if fmt.Sprintf("%#v", testCase.out) != fmt.Sprintf("%#v", objs) {
t.Errorf("%d: objects were not equal: \n%#v\n%#v", i, testCase.out, objs)
}
}
}