Correctly parse multiple resources from files
Refactored how files, directories and stdin are handled. Every file must pass through the FileVisitor and then streamed through StreamVisitor. FileVisitor takes care of opening/closing files and StreamVisitor is parsing multiple resources.
This commit is contained in:
parent
b3aad24d40
commit
697e07f864
@ -200,6 +200,7 @@ func TestExampleObjectSchemas(t *testing.T) {
|
||||
"http-liveness": &api.Pod{},
|
||||
},
|
||||
"../examples": {
|
||||
"multi-pod": nil,
|
||||
"pod": &api.Pod{},
|
||||
"replication": &api.ReplicationController{},
|
||||
"scheduler-policy-config": &schedulerapi.Policy{},
|
||||
|
49
examples/multi-pod.yaml
Normal file
49
examples/multi-pod.yaml
Normal file
@ -0,0 +1,49 @@
|
||||
---
|
||||
apiVersion: v1
|
||||
kind: Pod
|
||||
metadata:
|
||||
labels:
|
||||
name: redis
|
||||
redis-sentinel: "true"
|
||||
role: master
|
||||
name: redis-master
|
||||
spec:
|
||||
containers:
|
||||
- name: master
|
||||
image: kubernetes/redis:v1
|
||||
env:
|
||||
- name: MASTER
|
||||
value: "true"
|
||||
ports:
|
||||
- containerPort: 6379
|
||||
resources:
|
||||
limits:
|
||||
cpu: "0.5"
|
||||
volumeMounts:
|
||||
- mountPath: /redis-master-data
|
||||
name: data
|
||||
- name: sentinel
|
||||
image: kubernetes/redis:v1
|
||||
env:
|
||||
- name: SENTINEL
|
||||
value: "true"
|
||||
ports:
|
||||
- containerPort: 26379
|
||||
volumes:
|
||||
- name: data
|
||||
emptyDir: {}
|
||||
---
|
||||
apiVersion: v1
|
||||
kind: Pod
|
||||
metadata:
|
||||
labels:
|
||||
name: redis-proxy
|
||||
role: proxy
|
||||
name: redis-proxy
|
||||
spec:
|
||||
containers:
|
||||
- name: proxy
|
||||
image: kubernetes/redis-proxy:v1
|
||||
ports:
|
||||
- containerPort: 6379
|
||||
name: api
|
@ -417,6 +417,21 @@ for version in "${kube_api_versions[@]}"; do
|
||||
# Post-condition: no POD is running
|
||||
kube::test::get_object_assert pods "{{range.items}}{{$id_field}}:{{end}}" ''
|
||||
|
||||
### Create two PODs from 1 yaml file
|
||||
# Pre-condition: no POD is running
|
||||
kube::test::get_object_assert pods "{{range.items}}{{$id_field}}:{{end}}" ''
|
||||
# Command
|
||||
kubectl create -f examples/multi-pod.yaml "${kube_flags[@]}"
|
||||
# Post-condition: valid-pod and redis-proxy PODs are running
|
||||
kube::test::get_object_assert pods "{{range.items}}{{$id_field}}:{{end}}" 'redis-master:redis-proxy:'
|
||||
|
||||
### Delete two PODs from 1 yaml file
|
||||
# Pre-condition: redis-master and redis-proxy PODs are running
|
||||
kube::test::get_object_assert pods "{{range.items}}{{$id_field}}:{{end}}" 'redis-master:redis-proxy:'
|
||||
# Command
|
||||
kubectl delete -f examples/multi-pod.yaml "${kube_flags[@]}"
|
||||
# Post-condition: no PODs are running
|
||||
kube::test::get_object_assert pods "{{range.items}}{{$id_field}}:{{end}}" ''
|
||||
|
||||
##############
|
||||
# Namespaces #
|
||||
|
@ -86,9 +86,9 @@ func (b *Builder) Schema(schema validation.Schema) *Builder {
|
||||
return b
|
||||
}
|
||||
|
||||
// Filename is parameters passed via a filename argument which may be URLs, the "-" argument indicating
|
||||
// STDIN, or paths to files or directories. If ContinueOnError() is set prior to this method being called,
|
||||
// objects on the path that are unrecognized will be ignored (but logged at V(2)).
|
||||
// FilenameParam groups input in two categories: URLs and files (files, directories, STDIN)
|
||||
// If ContinueOnError() is set prior to this method, objects on the path that are not
|
||||
// recognized will be ignored (but logged at V(2)).
|
||||
func (b *Builder) FilenameParam(paths ...string) *Builder {
|
||||
for _, s := range paths {
|
||||
switch {
|
||||
@ -124,7 +124,9 @@ func (b *Builder) URL(urls ...*url.URL) *Builder {
|
||||
// prior to this method being called, objects in the stream that are unrecognized
|
||||
// will be ignored (but logged at V(2)).
|
||||
func (b *Builder) Stdin() *Builder {
|
||||
return b.Stream(os.Stdin, "STDIN")
|
||||
b.stream = true
|
||||
b.paths = append(b.paths, FileVisitorForSTDIN(b.mapper, b.continueOnError, b.schema))
|
||||
return b
|
||||
}
|
||||
|
||||
// Stream will read objects from the provided reader, and if an error occurs will
|
||||
@ -133,16 +135,18 @@ func (b *Builder) Stdin() *Builder {
|
||||
// will be ignored (but logged at V(2)).
|
||||
func (b *Builder) Stream(r io.Reader, name string) *Builder {
|
||||
b.stream = true
|
||||
b.paths = append(b.paths, NewStreamVisitor(r, b.mapper, b.schema, name, b.continueOnError))
|
||||
b.paths = append(b.paths, NewStreamVisitor(r, b.mapper, name, b.continueOnError, b.schema))
|
||||
return b
|
||||
}
|
||||
|
||||
// Path is a set of filesystem paths that may be files containing one or more
|
||||
// resources. If ContinueOnError() is set prior to this method being called,
|
||||
// objects on the path that are unrecognized will be ignored (but logged at V(2)).
|
||||
// Path accepts a set of paths that may be files, directories (all can containing
|
||||
// one or more resources). Creates a FileVisitor for each file and then each
|
||||
// FileVisitor is streaming the content to a StreamVisitor. If ContinueOnError() is set
|
||||
// prior to this method being called, objects on the path that are unrecognized will be
|
||||
// ignored (but logged at V(2)).
|
||||
func (b *Builder) Path(paths ...string) *Builder {
|
||||
for _, p := range paths {
|
||||
i, err := os.Stat(p)
|
||||
_, err := os.Stat(p)
|
||||
if os.IsNotExist(err) {
|
||||
b.errs = append(b.errs, fmt.Errorf("the path %q does not exist", p))
|
||||
continue
|
||||
@ -151,26 +155,16 @@ func (b *Builder) Path(paths ...string) *Builder {
|
||||
b.errs = append(b.errs, fmt.Errorf("the path %q cannot be accessed: %v", p, err))
|
||||
continue
|
||||
}
|
||||
var visitor Visitor
|
||||
if i.IsDir() {
|
||||
b.dir = true
|
||||
visitor = &DirectoryVisitor{
|
||||
Mapper: b.mapper,
|
||||
Path: p,
|
||||
Extensions: []string{".json", ".yaml", ".yml"},
|
||||
Recursive: false,
|
||||
IgnoreErrors: b.continueOnError,
|
||||
Schema: b.schema,
|
||||
}
|
||||
} else {
|
||||
visitor = &PathVisitor{
|
||||
Mapper: b.mapper,
|
||||
Path: p,
|
||||
IgnoreErrors: b.continueOnError,
|
||||
Schema: b.schema,
|
||||
}
|
||||
|
||||
visitors, err := ExpandPathsToFileVisitors(b.mapper, p, false, []string{".json", ".yaml", ".yml"}, b.continueOnError, b.schema)
|
||||
if err != nil {
|
||||
b.errs = append(b.errs, fmt.Errorf("error reading %q: %v", p, err))
|
||||
}
|
||||
b.paths = append(b.paths, visitor)
|
||||
if len(visitors) > 1 {
|
||||
b.dir = true
|
||||
}
|
||||
|
||||
b.paths = append(b.paths, visitors...)
|
||||
}
|
||||
return b
|
||||
}
|
||||
@ -207,8 +201,8 @@ func (b *Builder) Selector(selector labels.Selector) *Builder {
|
||||
return b
|
||||
}
|
||||
|
||||
// The namespace that these resources should be assumed to under - used by DefaultNamespace()
|
||||
// and RequireNamespace()
|
||||
// NamespaceParam accepts the namespace that these resources should be
|
||||
// considered under from - used by DefaultNamespace() and RequireNamespace()
|
||||
func (b *Builder) NamespaceParam(namespace string) *Builder {
|
||||
b.namespace = namespace
|
||||
return b
|
||||
|
@ -36,6 +36,8 @@ import (
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
|
||||
)
|
||||
|
||||
const constSTDINstr string = "STDIN"
|
||||
|
||||
// Visitor lets clients walk a list of resources.
|
||||
// TODO: we should rethink how we handle errors in the visit loop
|
||||
// (See https://github.com/GoogleCloudPlatform/kubernetes/pull/9357#issuecomment-109600305)
|
||||
@ -204,101 +206,6 @@ func ValidateSchema(data []byte, schema validation.Schema) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// PathVisitor visits a given path and returns an object representing the file
|
||||
// at that path.
|
||||
type PathVisitor struct {
|
||||
*Mapper
|
||||
// The file path to load
|
||||
Path string
|
||||
// Whether to ignore files that are not recognized as API objects
|
||||
IgnoreErrors bool
|
||||
// Schema for validation
|
||||
Schema validation.Schema
|
||||
}
|
||||
|
||||
func (v *PathVisitor) Visit(fn VisitorFunc) error {
|
||||
data, err := ioutil.ReadFile(v.Path)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to read %q: %v", v.Path, err)
|
||||
}
|
||||
if err := ValidateSchema(data, v.Schema); err != nil {
|
||||
return fmt.Errorf("error validating %q: %v", v.Path, err)
|
||||
}
|
||||
info, err := v.Mapper.InfoForData(data, v.Path)
|
||||
if err != nil {
|
||||
if !v.IgnoreErrors {
|
||||
return err
|
||||
}
|
||||
fmt.Fprintf(os.Stderr, "error: unable to load file %q: %v\n", v.Path, err)
|
||||
return nil
|
||||
}
|
||||
return fn(info)
|
||||
}
|
||||
|
||||
// DirectoryVisitor loads the specified files from a directory and passes them
|
||||
// to visitors.
|
||||
type DirectoryVisitor struct {
|
||||
*Mapper
|
||||
// The directory or file to start from
|
||||
Path string
|
||||
// Whether directories are recursed
|
||||
Recursive bool
|
||||
// The file extensions to include. If empty, all files are read.
|
||||
Extensions []string
|
||||
// Whether to ignore files that are not recognized as API objects
|
||||
IgnoreErrors bool
|
||||
// Schema for validation
|
||||
Schema validation.Schema
|
||||
}
|
||||
|
||||
func (v *DirectoryVisitor) ignoreFile(path string) bool {
|
||||
if len(v.Extensions) == 0 {
|
||||
return false
|
||||
}
|
||||
ext := filepath.Ext(path)
|
||||
for _, s := range v.Extensions {
|
||||
if s == ext {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (v *DirectoryVisitor) Visit(fn VisitorFunc) error {
|
||||
return filepath.Walk(v.Path, func(path string, fi os.FileInfo, err error) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if fi.IsDir() {
|
||||
if path != v.Path && !v.Recursive {
|
||||
return filepath.SkipDir
|
||||
}
|
||||
return nil
|
||||
}
|
||||
if v.ignoreFile(path) {
|
||||
return nil
|
||||
}
|
||||
|
||||
data, err := ioutil.ReadFile(path)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to read %q: %v", path, err)
|
||||
}
|
||||
if err := ValidateSchema(data, v.Schema); err != nil {
|
||||
return fmt.Errorf("error validating %q: %v", path, err)
|
||||
}
|
||||
info, err := v.Mapper.InfoForData(data, path)
|
||||
if err != nil {
|
||||
if !v.IgnoreErrors {
|
||||
return err
|
||||
}
|
||||
fmt.Fprintf(os.Stderr, "error: unable to load file %q: %v\n", path, err)
|
||||
return nil
|
||||
}
|
||||
return fn(info)
|
||||
})
|
||||
}
|
||||
|
||||
// URLVisitor downloads the contents of a URL, and if successful, returns
|
||||
// an info object representing the downloaded object.
|
||||
type URLVisitor struct {
|
||||
@ -437,6 +344,85 @@ func (v FlattenListVisitor) Visit(fn VisitorFunc) error {
|
||||
})
|
||||
}
|
||||
|
||||
func ignoreFile(path string, extensions []string) bool {
|
||||
if len(extensions) == 0 {
|
||||
return false
|
||||
}
|
||||
ext := filepath.Ext(path)
|
||||
for _, s := range extensions {
|
||||
if s == ext {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// FileVisitorForSTDIN return a special FileVisitor just for STDIN
|
||||
func FileVisitorForSTDIN(mapper *Mapper, ignoreErrors bool, schema validation.Schema) Visitor {
|
||||
return &FileVisitor{
|
||||
Path: constSTDINstr,
|
||||
StreamVisitor: NewStreamVisitor(nil, mapper, constSTDINstr, ignoreErrors, schema),
|
||||
}
|
||||
}
|
||||
|
||||
// ExpandPathsToFileVisitors will return a slice of FileVisitors that will handle files from the provided path.
|
||||
// After FileVisitors open the files, they will pass a io.Reader to a StreamVisitor to do the reading. (stdin
|
||||
// is also taken care of). Paths argument also accepts a single file, and will return a single visitor
|
||||
func ExpandPathsToFileVisitors(mapper *Mapper, paths string, recursive bool, extensions []string, ignoreErrors bool, schema validation.Schema) ([]Visitor, error) {
|
||||
var visitors []Visitor
|
||||
err := filepath.Walk(paths, func(path string, fi os.FileInfo, err error) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if fi.IsDir() {
|
||||
if path != paths && !recursive {
|
||||
return filepath.SkipDir
|
||||
}
|
||||
return nil
|
||||
}
|
||||
if ignoreFile(path, extensions) {
|
||||
return nil
|
||||
}
|
||||
|
||||
visitor := &FileVisitor{
|
||||
Path: path,
|
||||
StreamVisitor: NewStreamVisitor(nil, mapper, path, ignoreErrors, schema),
|
||||
}
|
||||
|
||||
visitors = append(visitors, visitor)
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return visitors, nil
|
||||
}
|
||||
|
||||
// FileVisitor is wrapping around a StreamVisitor, to handle open/close files
|
||||
type FileVisitor struct {
|
||||
Path string
|
||||
*StreamVisitor
|
||||
}
|
||||
|
||||
// Visit in a FileVisitor is just taking care of opening/closing files
|
||||
func (v *FileVisitor) Visit(fn VisitorFunc) error {
|
||||
var f *os.File
|
||||
if v.Path == constSTDINstr {
|
||||
f = os.Stdin
|
||||
} else {
|
||||
var err error
|
||||
if f, err = os.Open(v.Path); err != nil {
|
||||
return fmt.Errorf("unable to open %q: %v", v.Path, err)
|
||||
}
|
||||
}
|
||||
defer f.Close()
|
||||
v.StreamVisitor.Reader = f
|
||||
|
||||
return v.StreamVisitor.Visit(fn)
|
||||
}
|
||||
|
||||
// StreamVisitor reads objects from an io.Reader and walks them. A stream visitor can only be
|
||||
// visited once.
|
||||
// TODO: depends on objects being in JSON format before being passed to decode - need to implement
|
||||
@ -450,16 +436,18 @@ type StreamVisitor struct {
|
||||
Schema validation.Schema
|
||||
}
|
||||
|
||||
// NewStreamVisitor creates a visitor that will return resources that were encoded into the provided
|
||||
// stream. If ignoreErrors is set, unrecognized or invalid objects will be skipped and logged. An
|
||||
// empty stream is treated as an error for now.
|
||||
// TODO: convert ignoreErrors into a func(data, error, count) bool that consumers can use to decide
|
||||
// what to do with ignored errors.
|
||||
func NewStreamVisitor(r io.Reader, mapper *Mapper, schema validation.Schema, source string, ignoreErrors bool) Visitor {
|
||||
return &StreamVisitor{r, mapper, source, ignoreErrors, schema}
|
||||
// NewStreamVisitor is a helper function that is useful when we want to change the fields of the struct but keep calls the same.
|
||||
func NewStreamVisitor(r io.Reader, mapper *Mapper, source string, ignoreErrors bool, schema validation.Schema) *StreamVisitor {
|
||||
return &StreamVisitor{
|
||||
Reader: r,
|
||||
Mapper: mapper,
|
||||
Source: source,
|
||||
IgnoreErrors: ignoreErrors,
|
||||
Schema: schema,
|
||||
}
|
||||
}
|
||||
|
||||
// Visit implements Visitor over a stream.
|
||||
// Visit implements Visitor over a stream. StreamVisitor is able to distinct multiple resources in one stream.
|
||||
func (v *StreamVisitor) Visit(fn VisitorFunc) error {
|
||||
d := yaml.NewYAMLOrJSONDecoder(v.Reader, 4096)
|
||||
for {
|
||||
@ -490,7 +478,6 @@ func (v *StreamVisitor) Visit(fn VisitorFunc) error {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func UpdateObjectNamespace(info *Info) error {
|
||||
|
Loading…
Reference in New Issue
Block a user