diff --git a/examples/examples_test.go b/examples/examples_test.go index 3230eb76d4..e8286a468b 100644 --- a/examples/examples_test.go +++ b/examples/examples_test.go @@ -200,6 +200,7 @@ func TestExampleObjectSchemas(t *testing.T) { "http-liveness": &api.Pod{}, }, "../examples": { + "multi-pod": nil, "pod": &api.Pod{}, "replication": &api.ReplicationController{}, "scheduler-policy-config": &schedulerapi.Policy{}, diff --git a/examples/multi-pod.yaml b/examples/multi-pod.yaml new file mode 100644 index 0000000000..7f7d0a5745 --- /dev/null +++ b/examples/multi-pod.yaml @@ -0,0 +1,49 @@ +--- +apiVersion: v1 +kind: Pod +metadata: + labels: + name: redis + redis-sentinel: "true" + role: master + name: redis-master +spec: + containers: + - name: master + image: kubernetes/redis:v1 + env: + - name: MASTER + value: "true" + ports: + - containerPort: 6379 + resources: + limits: + cpu: "0.5" + volumeMounts: + - mountPath: /redis-master-data + name: data + - name: sentinel + image: kubernetes/redis:v1 + env: + - name: SENTINEL + value: "true" + ports: + - containerPort: 26379 + volumes: + - name: data + emptyDir: {} +--- +apiVersion: v1 +kind: Pod +metadata: + labels: + name: redis-proxy + role: proxy + name: redis-proxy +spec: + containers: + - name: proxy + image: kubernetes/redis-proxy:v1 + ports: + - containerPort: 6379 + name: api diff --git a/hack/test-cmd.sh b/hack/test-cmd.sh index 2a52ca7af1..335ccdb5af 100755 --- a/hack/test-cmd.sh +++ b/hack/test-cmd.sh @@ -417,6 +417,21 @@ for version in "${kube_api_versions[@]}"; do # Post-condition: no POD is running kube::test::get_object_assert pods "{{range.items}}{{$id_field}}:{{end}}" '' + ### Create two PODs from 1 yaml file + # Pre-condition: no POD is running + kube::test::get_object_assert pods "{{range.items}}{{$id_field}}:{{end}}" '' + # Command + kubectl create -f examples/multi-pod.yaml "${kube_flags[@]}" + # Post-condition: valid-pod and redis-proxy PODs are running + kube::test::get_object_assert pods "{{range.items}}{{$id_field}}:{{end}}" 'redis-master:redis-proxy:' + + ### Delete two PODs from 1 yaml file + # Pre-condition: redis-master and redis-proxy PODs are running + kube::test::get_object_assert pods "{{range.items}}{{$id_field}}:{{end}}" 'redis-master:redis-proxy:' + # Command + kubectl delete -f examples/multi-pod.yaml "${kube_flags[@]}" + # Post-condition: no PODs are running + kube::test::get_object_assert pods "{{range.items}}{{$id_field}}:{{end}}" '' ############## # Namespaces # diff --git a/pkg/kubectl/resource/builder.go b/pkg/kubectl/resource/builder.go index c30eb8e1b7..045b2d89f0 100644 --- a/pkg/kubectl/resource/builder.go +++ b/pkg/kubectl/resource/builder.go @@ -86,9 +86,9 @@ func (b *Builder) Schema(schema validation.Schema) *Builder { return b } -// Filename is parameters passed via a filename argument which may be URLs, the "-" argument indicating -// STDIN, or paths to files or directories. If ContinueOnError() is set prior to this method being called, -// objects on the path that are unrecognized will be ignored (but logged at V(2)). +// FilenameParam groups input in two categories: URLs and files (files, directories, STDIN) +// If ContinueOnError() is set prior to this method, objects on the path that are not +// recognized will be ignored (but logged at V(2)). func (b *Builder) FilenameParam(paths ...string) *Builder { for _, s := range paths { switch { @@ -124,7 +124,9 @@ func (b *Builder) URL(urls ...*url.URL) *Builder { // prior to this method being called, objects in the stream that are unrecognized // will be ignored (but logged at V(2)). func (b *Builder) Stdin() *Builder { - return b.Stream(os.Stdin, "STDIN") + b.stream = true + b.paths = append(b.paths, FileVisitorForSTDIN(b.mapper, b.continueOnError, b.schema)) + return b } // Stream will read objects from the provided reader, and if an error occurs will @@ -133,16 +135,18 @@ func (b *Builder) Stdin() *Builder { // will be ignored (but logged at V(2)). func (b *Builder) Stream(r io.Reader, name string) *Builder { b.stream = true - b.paths = append(b.paths, NewStreamVisitor(r, b.mapper, b.schema, name, b.continueOnError)) + b.paths = append(b.paths, NewStreamVisitor(r, b.mapper, name, b.continueOnError, b.schema)) return b } -// Path is a set of filesystem paths that may be files containing one or more -// resources. If ContinueOnError() is set prior to this method being called, -// objects on the path that are unrecognized will be ignored (but logged at V(2)). +// Path accepts a set of paths that may be files, directories (all can containing +// one or more resources). Creates a FileVisitor for each file and then each +// FileVisitor is streaming the content to a StreamVisitor. If ContinueOnError() is set +// prior to this method being called, objects on the path that are unrecognized will be +// ignored (but logged at V(2)). func (b *Builder) Path(paths ...string) *Builder { for _, p := range paths { - i, err := os.Stat(p) + _, err := os.Stat(p) if os.IsNotExist(err) { b.errs = append(b.errs, fmt.Errorf("the path %q does not exist", p)) continue @@ -151,26 +155,16 @@ func (b *Builder) Path(paths ...string) *Builder { b.errs = append(b.errs, fmt.Errorf("the path %q cannot be accessed: %v", p, err)) continue } - var visitor Visitor - if i.IsDir() { - b.dir = true - visitor = &DirectoryVisitor{ - Mapper: b.mapper, - Path: p, - Extensions: []string{".json", ".yaml", ".yml"}, - Recursive: false, - IgnoreErrors: b.continueOnError, - Schema: b.schema, - } - } else { - visitor = &PathVisitor{ - Mapper: b.mapper, - Path: p, - IgnoreErrors: b.continueOnError, - Schema: b.schema, - } + + visitors, err := ExpandPathsToFileVisitors(b.mapper, p, false, []string{".json", ".yaml", ".yml"}, b.continueOnError, b.schema) + if err != nil { + b.errs = append(b.errs, fmt.Errorf("error reading %q: %v", p, err)) } - b.paths = append(b.paths, visitor) + if len(visitors) > 1 { + b.dir = true + } + + b.paths = append(b.paths, visitors...) } return b } @@ -207,8 +201,8 @@ func (b *Builder) Selector(selector labels.Selector) *Builder { return b } -// The namespace that these resources should be assumed to under - used by DefaultNamespace() -// and RequireNamespace() +// NamespaceParam accepts the namespace that these resources should be +// considered under from - used by DefaultNamespace() and RequireNamespace() func (b *Builder) NamespaceParam(namespace string) *Builder { b.namespace = namespace return b diff --git a/pkg/kubectl/resource/visitor.go b/pkg/kubectl/resource/visitor.go index a6013fa978..5e54959189 100644 --- a/pkg/kubectl/resource/visitor.go +++ b/pkg/kubectl/resource/visitor.go @@ -36,6 +36,8 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) +const constSTDINstr string = "STDIN" + // Visitor lets clients walk a list of resources. // TODO: we should rethink how we handle errors in the visit loop // (See https://github.com/GoogleCloudPlatform/kubernetes/pull/9357#issuecomment-109600305) @@ -204,101 +206,6 @@ func ValidateSchema(data []byte, schema validation.Schema) error { return nil } -// PathVisitor visits a given path and returns an object representing the file -// at that path. -type PathVisitor struct { - *Mapper - // The file path to load - Path string - // Whether to ignore files that are not recognized as API objects - IgnoreErrors bool - // Schema for validation - Schema validation.Schema -} - -func (v *PathVisitor) Visit(fn VisitorFunc) error { - data, err := ioutil.ReadFile(v.Path) - if err != nil { - return fmt.Errorf("unable to read %q: %v", v.Path, err) - } - if err := ValidateSchema(data, v.Schema); err != nil { - return fmt.Errorf("error validating %q: %v", v.Path, err) - } - info, err := v.Mapper.InfoForData(data, v.Path) - if err != nil { - if !v.IgnoreErrors { - return err - } - fmt.Fprintf(os.Stderr, "error: unable to load file %q: %v\n", v.Path, err) - return nil - } - return fn(info) -} - -// DirectoryVisitor loads the specified files from a directory and passes them -// to visitors. -type DirectoryVisitor struct { - *Mapper - // The directory or file to start from - Path string - // Whether directories are recursed - Recursive bool - // The file extensions to include. If empty, all files are read. - Extensions []string - // Whether to ignore files that are not recognized as API objects - IgnoreErrors bool - // Schema for validation - Schema validation.Schema -} - -func (v *DirectoryVisitor) ignoreFile(path string) bool { - if len(v.Extensions) == 0 { - return false - } - ext := filepath.Ext(path) - for _, s := range v.Extensions { - if s == ext { - return false - } - } - return true -} - -func (v *DirectoryVisitor) Visit(fn VisitorFunc) error { - return filepath.Walk(v.Path, func(path string, fi os.FileInfo, err error) error { - if err != nil { - return err - } - - if fi.IsDir() { - if path != v.Path && !v.Recursive { - return filepath.SkipDir - } - return nil - } - if v.ignoreFile(path) { - return nil - } - - data, err := ioutil.ReadFile(path) - if err != nil { - return fmt.Errorf("unable to read %q: %v", path, err) - } - if err := ValidateSchema(data, v.Schema); err != nil { - return fmt.Errorf("error validating %q: %v", path, err) - } - info, err := v.Mapper.InfoForData(data, path) - if err != nil { - if !v.IgnoreErrors { - return err - } - fmt.Fprintf(os.Stderr, "error: unable to load file %q: %v\n", path, err) - return nil - } - return fn(info) - }) -} - // URLVisitor downloads the contents of a URL, and if successful, returns // an info object representing the downloaded object. type URLVisitor struct { @@ -437,6 +344,85 @@ func (v FlattenListVisitor) Visit(fn VisitorFunc) error { }) } +func ignoreFile(path string, extensions []string) bool { + if len(extensions) == 0 { + return false + } + ext := filepath.Ext(path) + for _, s := range extensions { + if s == ext { + return false + } + } + return true +} + +// FileVisitorForSTDIN return a special FileVisitor just for STDIN +func FileVisitorForSTDIN(mapper *Mapper, ignoreErrors bool, schema validation.Schema) Visitor { + return &FileVisitor{ + Path: constSTDINstr, + StreamVisitor: NewStreamVisitor(nil, mapper, constSTDINstr, ignoreErrors, schema), + } +} + +// ExpandPathsToFileVisitors will return a slice of FileVisitors that will handle files from the provided path. +// After FileVisitors open the files, they will pass a io.Reader to a StreamVisitor to do the reading. (stdin +// is also taken care of). Paths argument also accepts a single file, and will return a single visitor +func ExpandPathsToFileVisitors(mapper *Mapper, paths string, recursive bool, extensions []string, ignoreErrors bool, schema validation.Schema) ([]Visitor, error) { + var visitors []Visitor + err := filepath.Walk(paths, func(path string, fi os.FileInfo, err error) error { + if err != nil { + return err + } + + if fi.IsDir() { + if path != paths && !recursive { + return filepath.SkipDir + } + return nil + } + if ignoreFile(path, extensions) { + return nil + } + + visitor := &FileVisitor{ + Path: path, + StreamVisitor: NewStreamVisitor(nil, mapper, path, ignoreErrors, schema), + } + + visitors = append(visitors, visitor) + return nil + }) + + if err != nil { + return nil, err + } + return visitors, nil +} + +// FileVisitor is wrapping around a StreamVisitor, to handle open/close files +type FileVisitor struct { + Path string + *StreamVisitor +} + +// Visit in a FileVisitor is just taking care of opening/closing files +func (v *FileVisitor) Visit(fn VisitorFunc) error { + var f *os.File + if v.Path == constSTDINstr { + f = os.Stdin + } else { + var err error + if f, err = os.Open(v.Path); err != nil { + return fmt.Errorf("unable to open %q: %v", v.Path, err) + } + } + defer f.Close() + v.StreamVisitor.Reader = f + + return v.StreamVisitor.Visit(fn) +} + // StreamVisitor reads objects from an io.Reader and walks them. A stream visitor can only be // visited once. // TODO: depends on objects being in JSON format before being passed to decode - need to implement @@ -450,16 +436,18 @@ type StreamVisitor struct { Schema validation.Schema } -// NewStreamVisitor creates a visitor that will return resources that were encoded into the provided -// stream. If ignoreErrors is set, unrecognized or invalid objects will be skipped and logged. An -// empty stream is treated as an error for now. -// TODO: convert ignoreErrors into a func(data, error, count) bool that consumers can use to decide -// what to do with ignored errors. -func NewStreamVisitor(r io.Reader, mapper *Mapper, schema validation.Schema, source string, ignoreErrors bool) Visitor { - return &StreamVisitor{r, mapper, source, ignoreErrors, schema} +// NewStreamVisitor is a helper function that is useful when we want to change the fields of the struct but keep calls the same. +func NewStreamVisitor(r io.Reader, mapper *Mapper, source string, ignoreErrors bool, schema validation.Schema) *StreamVisitor { + return &StreamVisitor{ + Reader: r, + Mapper: mapper, + Source: source, + IgnoreErrors: ignoreErrors, + Schema: schema, + } } -// Visit implements Visitor over a stream. +// Visit implements Visitor over a stream. StreamVisitor is able to distinct multiple resources in one stream. func (v *StreamVisitor) Visit(fn VisitorFunc) error { d := yaml.NewYAMLOrJSONDecoder(v.Reader, 4096) for { @@ -490,7 +478,6 @@ func (v *StreamVisitor) Visit(fn VisitorFunc) error { return err } } - return nil } func UpdateObjectNamespace(info *Info) error {