Merge pull request #9781 from andronat/5840_yaml_2

Fixes ignored objects in one YAML file
pull/6/head
Alex Robinson 2015-06-29 14:29:59 -07:00
commit 5d13e78572
5 changed files with 180 additions and 134 deletions

View File

@ -200,6 +200,7 @@ func TestExampleObjectSchemas(t *testing.T) {
"http-liveness": &api.Pod{},
},
"../examples": {
"multi-pod": nil,
"pod": &api.Pod{},
"replication": &api.ReplicationController{},
"scheduler-policy-config": &schedulerapi.Policy{},

49
examples/multi-pod.yaml Normal file
View File

@ -0,0 +1,49 @@
---
apiVersion: v1
kind: Pod
metadata:
labels:
name: redis
redis-sentinel: "true"
role: master
name: redis-master
spec:
containers:
- name: master
image: kubernetes/redis:v1
env:
- name: MASTER
value: "true"
ports:
- containerPort: 6379
resources:
limits:
cpu: "0.5"
volumeMounts:
- mountPath: /redis-master-data
name: data
- name: sentinel
image: kubernetes/redis:v1
env:
- name: SENTINEL
value: "true"
ports:
- containerPort: 26379
volumes:
- name: data
emptyDir: {}
---
apiVersion: v1
kind: Pod
metadata:
labels:
name: redis-proxy
role: proxy
name: redis-proxy
spec:
containers:
- name: proxy
image: kubernetes/redis-proxy:v1
ports:
- containerPort: 6379
name: api

View File

@ -417,6 +417,21 @@ for version in "${kube_api_versions[@]}"; do
# Post-condition: no POD is running
kube::test::get_object_assert pods "{{range.items}}{{$id_field}}:{{end}}" ''
### Create two PODs from 1 yaml file
# Pre-condition: no POD is running
kube::test::get_object_assert pods "{{range.items}}{{$id_field}}:{{end}}" ''
# Command
kubectl create -f examples/multi-pod.yaml "${kube_flags[@]}"
# Post-condition: valid-pod and redis-proxy PODs are running
kube::test::get_object_assert pods "{{range.items}}{{$id_field}}:{{end}}" 'redis-master:redis-proxy:'
### Delete two PODs from 1 yaml file
# Pre-condition: redis-master and redis-proxy PODs are running
kube::test::get_object_assert pods "{{range.items}}{{$id_field}}:{{end}}" 'redis-master:redis-proxy:'
# Command
kubectl delete -f examples/multi-pod.yaml "${kube_flags[@]}"
# Post-condition: no PODs are running
kube::test::get_object_assert pods "{{range.items}}{{$id_field}}:{{end}}" ''
##############
# Namespaces #

View File

@ -86,9 +86,9 @@ func (b *Builder) Schema(schema validation.Schema) *Builder {
return b
}
// Filename is parameters passed via a filename argument which may be URLs, the "-" argument indicating
// STDIN, or paths to files or directories. If ContinueOnError() is set prior to this method being called,
// objects on the path that are unrecognized will be ignored (but logged at V(2)).
// FilenameParam groups input in two categories: URLs and files (files, directories, STDIN)
// If ContinueOnError() is set prior to this method, objects on the path that are not
// recognized will be ignored (but logged at V(2)).
func (b *Builder) FilenameParam(paths ...string) *Builder {
for _, s := range paths {
switch {
@ -124,7 +124,9 @@ func (b *Builder) URL(urls ...*url.URL) *Builder {
// prior to this method being called, objects in the stream that are unrecognized
// will be ignored (but logged at V(2)).
func (b *Builder) Stdin() *Builder {
return b.Stream(os.Stdin, "STDIN")
b.stream = true
b.paths = append(b.paths, FileVisitorForSTDIN(b.mapper, b.continueOnError, b.schema))
return b
}
// Stream will read objects from the provided reader, and if an error occurs will
@ -133,16 +135,18 @@ func (b *Builder) Stdin() *Builder {
// will be ignored (but logged at V(2)).
func (b *Builder) Stream(r io.Reader, name string) *Builder {
b.stream = true
b.paths = append(b.paths, NewStreamVisitor(r, b.mapper, b.schema, name, b.continueOnError))
b.paths = append(b.paths, NewStreamVisitor(r, b.mapper, name, b.continueOnError, b.schema))
return b
}
// Path is a set of filesystem paths that may be files containing one or more
// resources. If ContinueOnError() is set prior to this method being called,
// objects on the path that are unrecognized will be ignored (but logged at V(2)).
// Path accepts a set of paths that may be files, directories (all can containing
// one or more resources). Creates a FileVisitor for each file and then each
// FileVisitor is streaming the content to a StreamVisitor. If ContinueOnError() is set
// prior to this method being called, objects on the path that are unrecognized will be
// ignored (but logged at V(2)).
func (b *Builder) Path(paths ...string) *Builder {
for _, p := range paths {
i, err := os.Stat(p)
_, err := os.Stat(p)
if os.IsNotExist(err) {
b.errs = append(b.errs, fmt.Errorf("the path %q does not exist", p))
continue
@ -151,26 +155,16 @@ func (b *Builder) Path(paths ...string) *Builder {
b.errs = append(b.errs, fmt.Errorf("the path %q cannot be accessed: %v", p, err))
continue
}
var visitor Visitor
if i.IsDir() {
b.dir = true
visitor = &DirectoryVisitor{
Mapper: b.mapper,
Path: p,
Extensions: []string{".json", ".yaml", ".yml"},
Recursive: false,
IgnoreErrors: b.continueOnError,
Schema: b.schema,
}
} else {
visitor = &PathVisitor{
Mapper: b.mapper,
Path: p,
IgnoreErrors: b.continueOnError,
Schema: b.schema,
}
visitors, err := ExpandPathsToFileVisitors(b.mapper, p, false, []string{".json", ".yaml", ".yml"}, b.continueOnError, b.schema)
if err != nil {
b.errs = append(b.errs, fmt.Errorf("error reading %q: %v", p, err))
}
b.paths = append(b.paths, visitor)
if len(visitors) > 1 {
b.dir = true
}
b.paths = append(b.paths, visitors...)
}
return b
}
@ -207,8 +201,8 @@ func (b *Builder) Selector(selector labels.Selector) *Builder {
return b
}
// The namespace that these resources should be assumed to under - used by DefaultNamespace()
// and RequireNamespace()
// NamespaceParam accepts the namespace that these resources should be
// considered under from - used by DefaultNamespace() and RequireNamespace()
func (b *Builder) NamespaceParam(namespace string) *Builder {
b.namespace = namespace
return b

View File

@ -36,6 +36,8 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
)
const constSTDINstr string = "STDIN"
// Visitor lets clients walk a list of resources.
// TODO: we should rethink how we handle errors in the visit loop
// (See https://github.com/GoogleCloudPlatform/kubernetes/pull/9357#issuecomment-109600305)
@ -204,101 +206,6 @@ func ValidateSchema(data []byte, schema validation.Schema) error {
return nil
}
// PathVisitor visits a given path and returns an object representing the file
// at that path.
type PathVisitor struct {
*Mapper
// The file path to load
Path string
// Whether to ignore files that are not recognized as API objects
IgnoreErrors bool
// Schema for validation
Schema validation.Schema
}
func (v *PathVisitor) Visit(fn VisitorFunc) error {
data, err := ioutil.ReadFile(v.Path)
if err != nil {
return fmt.Errorf("unable to read %q: %v", v.Path, err)
}
if err := ValidateSchema(data, v.Schema); err != nil {
return fmt.Errorf("error validating %q: %v", v.Path, err)
}
info, err := v.Mapper.InfoForData(data, v.Path)
if err != nil {
if !v.IgnoreErrors {
return err
}
fmt.Fprintf(os.Stderr, "error: unable to load file %q: %v\n", v.Path, err)
return nil
}
return fn(info)
}
// DirectoryVisitor loads the specified files from a directory and passes them
// to visitors.
type DirectoryVisitor struct {
*Mapper
// The directory or file to start from
Path string
// Whether directories are recursed
Recursive bool
// The file extensions to include. If empty, all files are read.
Extensions []string
// Whether to ignore files that are not recognized as API objects
IgnoreErrors bool
// Schema for validation
Schema validation.Schema
}
func (v *DirectoryVisitor) ignoreFile(path string) bool {
if len(v.Extensions) == 0 {
return false
}
ext := filepath.Ext(path)
for _, s := range v.Extensions {
if s == ext {
return false
}
}
return true
}
func (v *DirectoryVisitor) Visit(fn VisitorFunc) error {
return filepath.Walk(v.Path, func(path string, fi os.FileInfo, err error) error {
if err != nil {
return err
}
if fi.IsDir() {
if path != v.Path && !v.Recursive {
return filepath.SkipDir
}
return nil
}
if v.ignoreFile(path) {
return nil
}
data, err := ioutil.ReadFile(path)
if err != nil {
return fmt.Errorf("unable to read %q: %v", path, err)
}
if err := ValidateSchema(data, v.Schema); err != nil {
return fmt.Errorf("error validating %q: %v", path, err)
}
info, err := v.Mapper.InfoForData(data, path)
if err != nil {
if !v.IgnoreErrors {
return err
}
fmt.Fprintf(os.Stderr, "error: unable to load file %q: %v\n", path, err)
return nil
}
return fn(info)
})
}
// URLVisitor downloads the contents of a URL, and if successful, returns
// an info object representing the downloaded object.
type URLVisitor struct {
@ -437,6 +344,85 @@ func (v FlattenListVisitor) Visit(fn VisitorFunc) error {
})
}
func ignoreFile(path string, extensions []string) bool {
if len(extensions) == 0 {
return false
}
ext := filepath.Ext(path)
for _, s := range extensions {
if s == ext {
return false
}
}
return true
}
// FileVisitorForSTDIN return a special FileVisitor just for STDIN
func FileVisitorForSTDIN(mapper *Mapper, ignoreErrors bool, schema validation.Schema) Visitor {
return &FileVisitor{
Path: constSTDINstr,
StreamVisitor: NewStreamVisitor(nil, mapper, constSTDINstr, ignoreErrors, schema),
}
}
// ExpandPathsToFileVisitors will return a slice of FileVisitors that will handle files from the provided path.
// After FileVisitors open the files, they will pass a io.Reader to a StreamVisitor to do the reading. (stdin
// is also taken care of). Paths argument also accepts a single file, and will return a single visitor
func ExpandPathsToFileVisitors(mapper *Mapper, paths string, recursive bool, extensions []string, ignoreErrors bool, schema validation.Schema) ([]Visitor, error) {
var visitors []Visitor
err := filepath.Walk(paths, func(path string, fi os.FileInfo, err error) error {
if err != nil {
return err
}
if fi.IsDir() {
if path != paths && !recursive {
return filepath.SkipDir
}
return nil
}
if ignoreFile(path, extensions) {
return nil
}
visitor := &FileVisitor{
Path: path,
StreamVisitor: NewStreamVisitor(nil, mapper, path, ignoreErrors, schema),
}
visitors = append(visitors, visitor)
return nil
})
if err != nil {
return nil, err
}
return visitors, nil
}
// FileVisitor is wrapping around a StreamVisitor, to handle open/close files
type FileVisitor struct {
Path string
*StreamVisitor
}
// Visit in a FileVisitor is just taking care of opening/closing files
func (v *FileVisitor) Visit(fn VisitorFunc) error {
var f *os.File
if v.Path == constSTDINstr {
f = os.Stdin
} else {
var err error
if f, err = os.Open(v.Path); err != nil {
return fmt.Errorf("unable to open %q: %v", v.Path, err)
}
}
defer f.Close()
v.StreamVisitor.Reader = f
return v.StreamVisitor.Visit(fn)
}
// StreamVisitor reads objects from an io.Reader and walks them. A stream visitor can only be
// visited once.
// TODO: depends on objects being in JSON format before being passed to decode - need to implement
@ -450,16 +436,18 @@ type StreamVisitor struct {
Schema validation.Schema
}
// NewStreamVisitor creates a visitor that will return resources that were encoded into the provided
// stream. If ignoreErrors is set, unrecognized or invalid objects will be skipped and logged. An
// empty stream is treated as an error for now.
// TODO: convert ignoreErrors into a func(data, error, count) bool that consumers can use to decide
// what to do with ignored errors.
func NewStreamVisitor(r io.Reader, mapper *Mapper, schema validation.Schema, source string, ignoreErrors bool) Visitor {
return &StreamVisitor{r, mapper, source, ignoreErrors, schema}
// NewStreamVisitor is a helper function that is useful when we want to change the fields of the struct but keep calls the same.
func NewStreamVisitor(r io.Reader, mapper *Mapper, source string, ignoreErrors bool, schema validation.Schema) *StreamVisitor {
return &StreamVisitor{
Reader: r,
Mapper: mapper,
Source: source,
IgnoreErrors: ignoreErrors,
Schema: schema,
}
}
// Visit implements Visitor over a stream.
// Visit implements Visitor over a stream. StreamVisitor is able to distinct multiple resources in one stream.
func (v *StreamVisitor) Visit(fn VisitorFunc) error {
d := yaml.NewYAMLOrJSONDecoder(v.Reader, 4096)
for {
@ -490,7 +478,6 @@ func (v *StreamVisitor) Visit(fn VisitorFunc) error {
return err
}
}
return nil
}
func UpdateObjectNamespace(info *Info) error {