From 29dc7f6ec20aecf8887ed897ba3445d9ec64d042 Mon Sep 17 00:00:00 2001 From: feihujiang Date: Mon, 15 Jun 2015 10:48:56 +0800 Subject: [PATCH] Make a change to visitor to allow it to accept an error, like Go's path walker --- pkg/kubectl/cmd/annotation.go | 11 ++- pkg/kubectl/cmd/clusterinfo.go | 5 +- pkg/kubectl/cmd/create.go | 5 +- pkg/kubectl/cmd/delete.go | 10 ++- pkg/kubectl/cmd/get.go | 5 +- pkg/kubectl/cmd/label.go | 5 +- pkg/kubectl/cmd/replace.go | 10 ++- pkg/kubectl/resource/builder.go | 6 +- pkg/kubectl/resource/builder_test.go | 43 ++-------- pkg/kubectl/resource/result.go | 5 +- pkg/kubectl/resource/selector.go | 2 +- pkg/kubectl/resource/visitor.go | 122 ++++++++++++++++----------- 12 files changed, 128 insertions(+), 101 deletions(-) diff --git a/pkg/kubectl/cmd/annotation.go b/pkg/kubectl/cmd/annotation.go index 89a1ae56f99..789df577ecd 100644 --- a/pkg/kubectl/cmd/annotation.go +++ b/pkg/kubectl/cmd/annotation.go @@ -166,16 +166,19 @@ func (o AnnotateOptions) RunAnnotate() error { if err := r.Err(); err != nil { return err } - return r.Visit(func(info *resource.Info) error { - _, err := cmdutil.UpdateObject(info, func(obj runtime.Object) error { + return r.Visit(func(info *resource.Info, err error) error { + if err != nil { + return err + } + _, uErr := cmdutil.UpdateObject(info, func(obj runtime.Object) error { err := o.updateAnnotations(obj) if err != nil { return err } return nil }) - if err != nil { - return err + if uErr != nil { + return uErr } return nil }) diff --git a/pkg/kubectl/cmd/clusterinfo.go b/pkg/kubectl/cmd/clusterinfo.go index 2141e1b2bc4..da2e582ef0b 100644 --- a/pkg/kubectl/cmd/clusterinfo.go +++ b/pkg/kubectl/cmd/clusterinfo.go @@ -68,7 +68,10 @@ func RunClusterInfo(factory *cmdutil.Factory, out io.Writer, cmd *cobra.Command) SelectorParam("kubernetes.io/cluster-service=true"). ResourceTypeOrNameArgs(false, []string{"services"}...). Latest() - b.Do().Visit(func(r *resource.Info) error { + b.Do().Visit(func(r *resource.Info, err error) error { + if err != nil { + return err + } services := r.Object.(*api.ServiceList).Items for _, service := range services { var link string diff --git a/pkg/kubectl/cmd/create.go b/pkg/kubectl/cmd/create.go index 5f2f10736c7..a0c5a6bd459 100644 --- a/pkg/kubectl/cmd/create.go +++ b/pkg/kubectl/cmd/create.go @@ -96,7 +96,10 @@ func RunCreate(f *cmdutil.Factory, cmd *cobra.Command, out io.Writer) error { } count := 0 - err = r.Visit(func(info *resource.Info) error { + err = r.Visit(func(info *resource.Info, err error) error { + if err != nil { + return err + } data, err := info.Mapping.Codec.Encode(info.Object) if err != nil { return cmdutil.AddSourceToErr("creating", info.Source, err) diff --git a/pkg/kubectl/cmd/delete.go b/pkg/kubectl/cmd/delete.go index 66304647260..d8a9652c384 100644 --- a/pkg/kubectl/cmd/delete.go +++ b/pkg/kubectl/cmd/delete.go @@ -132,7 +132,10 @@ func ReapResult(r *resource.Result, f *cmdutil.Factory, out io.Writer, isDefault if ignoreNotFound { r = r.IgnoreErrors(errors.IsNotFound) } - err := r.Visit(func(info *resource.Info) error { + err := r.Visit(func(info *resource.Info, err error) error { + if err != nil { + return err + } found++ reaper, err := f.Reaper(info.Mapping) if err != nil { @@ -166,7 +169,10 @@ func DeleteResult(r *resource.Result, out io.Writer, ignoreNotFound bool, shortO if ignoreNotFound { r = r.IgnoreErrors(errors.IsNotFound) } - err := r.Visit(func(info *resource.Info) error { + err := r.Visit(func(info *resource.Info, err error) error { + if err != nil { + return err + } found++ return deleteResource(info, out, shortOutput, mapper) }) diff --git a/pkg/kubectl/cmd/get.go b/pkg/kubectl/cmd/get.go index 24c20af5d84..debecf209b0 100644 --- a/pkg/kubectl/cmd/get.go +++ b/pkg/kubectl/cmd/get.go @@ -205,7 +205,10 @@ func RunGet(f *cmdutil.Factory, out io.Writer, cmd *cobra.Command, args []string } // use the default printer for each object - return b.Do().Visit(func(r *resource.Info) error { + return b.Do().Visit(func(r *resource.Info, err error) error { + if err != nil { + return err + } printer, err := f.PrinterForMapping(cmd, r.Mapping, allNamespaces) if err != nil { return err diff --git a/pkg/kubectl/cmd/label.go b/pkg/kubectl/cmd/label.go index 50862008680..0dc7d74aa89 100644 --- a/pkg/kubectl/cmd/label.go +++ b/pkg/kubectl/cmd/label.go @@ -199,7 +199,10 @@ func RunLabel(f *cmdutil.Factory, out io.Writer, cmd *cobra.Command, args []stri } // TODO: support bulk generic output a la Get - return r.Visit(func(info *resource.Info) error { + return r.Visit(func(info *resource.Info, err error) error { + if err != nil { + return err + } obj, err := cmdutil.UpdateObject(info, func(obj runtime.Object) error { err := labelFunc(obj, overwrite, resourceVersion, labels, remove) if err != nil { diff --git a/pkg/kubectl/cmd/replace.go b/pkg/kubectl/cmd/replace.go index 5dc82f0be48..cdad1857757 100644 --- a/pkg/kubectl/cmd/replace.go +++ b/pkg/kubectl/cmd/replace.go @@ -115,7 +115,10 @@ func RunReplace(f *cmdutil.Factory, out io.Writer, cmd *cobra.Command, args []st return err } - return r.Visit(func(info *resource.Info) error { + return r.Visit(func(info *resource.Info, err error) error { + if err != nil { + return err + } data, err := info.Mapping.Codec.Encode(info.Object) if err != nil { return cmdutil.AddSourceToErr("replacing", info.Source, err) @@ -196,7 +199,10 @@ func forceReplace(f *cmdutil.Factory, out io.Writer, cmd *cobra.Command, args [] } count := 0 - err = r.Visit(func(info *resource.Info) error { + err = r.Visit(func(info *resource.Info, err error) error { + if err != nil { + return err + } data, err := info.Mapping.Codec.Encode(info.Object) if err != nil { return err diff --git a/pkg/kubectl/resource/builder.go b/pkg/kubectl/resource/builder.go index c28146cd21d..be5c08be270 100644 --- a/pkg/kubectl/resource/builder.go +++ b/pkg/kubectl/resource/builder.go @@ -133,7 +133,7 @@ func (b *Builder) URL(urls ...*url.URL) *Builder { // will be ignored (but logged at V(2)). func (b *Builder) Stdin() *Builder { b.stream = true - b.paths = append(b.paths, FileVisitorForSTDIN(b.mapper, b.continueOnError, b.schema)) + b.paths = append(b.paths, FileVisitorForSTDIN(b.mapper, b.schema)) return b } @@ -143,7 +143,7 @@ func (b *Builder) Stdin() *Builder { // will be ignored (but logged at V(2)). func (b *Builder) Stream(r io.Reader, name string) *Builder { b.stream = true - b.paths = append(b.paths, NewStreamVisitor(r, b.mapper, name, b.continueOnError, b.schema)) + b.paths = append(b.paths, NewStreamVisitor(r, b.mapper, name, b.schema)) return b } @@ -164,7 +164,7 @@ func (b *Builder) Path(paths ...string) *Builder { continue } - visitors, err := ExpandPathsToFileVisitors(b.mapper, p, false, []string{".json", ".stdin", ".yaml", ".yml"}, b.continueOnError, b.schema) + visitors, err := ExpandPathsToFileVisitors(b.mapper, p, false, []string{".json", ".stdin", ".yaml", ".yml"}, b.schema) if err != nil { b.errs = append(b.errs, fmt.Errorf("error reading %q: %v", p, err)) } diff --git a/pkg/kubectl/resource/builder_test.go b/pkg/kubectl/resource/builder_test.go index 21be7923197..0e104d0c282 100644 --- a/pkg/kubectl/resource/builder_test.go +++ b/pkg/kubectl/resource/builder_test.go @@ -162,7 +162,10 @@ type testVisitor struct { Infos []*Info } -func (v *testVisitor) Handle(info *Info) error { +func (v *testVisitor) Handle(info *Info, err error) error { + if err != nil { + return err + } v.Infos = append(v.Infos, info) return v.InjectErr } @@ -649,7 +652,7 @@ func TestContinueOnErrorVisitor(t *testing.T) { Do() count := 0 testErr := fmt.Errorf("test error") - err := req.Visit(func(_ *Info) error { + err := req.Visit(func(_ *Info, _ error) error { count++ if count > 1 { return testErr @@ -872,40 +875,6 @@ func TestLatest(t *testing.T) { } } -func TestIgnoreStreamErrors(t *testing.T) { - pods, svc := testData() - - r, w := io.Pipe() - go func() { - defer w.Close() - w.Write([]byte(`{}`)) - w.Write([]byte(runtime.EncodeOrDie(latest.Codec, &pods.Items[0]))) - }() - - r2, w2 := io.Pipe() - go func() { - defer w2.Close() - w2.Write([]byte(`{}`)) - w2.Write([]byte(runtime.EncodeOrDie(latest.Codec, &svc.Items[0]))) - }() - - b := NewBuilder(latest.RESTMapper, api.Scheme, fakeClient()). - ContinueOnError(). // TODO: order seems bad, but allows clients to determine what they want... - Stream(r, "1").Stream(r2, "2") - - test := &testVisitor{} - singular := false - - err := b.Do().IntoSingular(&singular).Visit(test.Handle) - if err != nil || singular || len(test.Infos) != 2 { - t.Fatalf("unexpected response: %v %t %#v", err, singular, test.Infos) - } - - if !api.Semantic.DeepDerivative([]runtime.Object{&pods.Items[0], &svc.Items[0]}, test.Objects()) { - t.Errorf("unexpected visited objects: %#v", test.Objects()) - } -} - func TestReceiveMultipleErrors(t *testing.T) { pods, svc := testData() @@ -931,7 +900,7 @@ func TestReceiveMultipleErrors(t *testing.T) { singular := false err := b.Do().IntoSingular(&singular).Visit(test.Handle) - if err == nil || singular || len(test.Infos) != 0 { + if err == nil || singular || len(test.Infos) != 2 { t.Fatalf("unexpected response: %v %t %#v", err, singular, test.Infos) } diff --git a/pkg/kubectl/resource/result.go b/pkg/kubectl/resource/result.go index 1e9e09a74d0..f8a8c688e19 100644 --- a/pkg/kubectl/resource/result.go +++ b/pkg/kubectl/resource/result.go @@ -98,7 +98,10 @@ func (r *Result) Infos() ([]*Info, error) { } infos := []*Info{} - err := r.visitor.Visit(func(info *Info) error { + err := r.visitor.Visit(func(info *Info, err error) error { + if err != nil { + return err + } infos = append(infos, info) return nil }) diff --git a/pkg/kubectl/resource/selector.go b/pkg/kubectl/resource/selector.go index d7861fd2151..da9a1f2ce5c 100644 --- a/pkg/kubectl/resource/selector.go +++ b/pkg/kubectl/resource/selector.go @@ -68,7 +68,7 @@ func (r *Selector) Visit(fn VisitorFunc) error { Object: list, ResourceVersion: resourceVersion, } - return fn(info) + return fn(info, nil) } func (r *Selector) Watch(resourceVersion string) (watch.Interface, error) { diff --git a/pkg/kubectl/resource/visitor.go b/pkg/kubectl/resource/visitor.go index 51e8ecc8524..f085d63df8d 100644 --- a/pkg/kubectl/resource/visitor.go +++ b/pkg/kubectl/resource/visitor.go @@ -26,8 +26,6 @@ import ( "os" "path/filepath" - "github.com/golang/glog" - "k8s.io/kubernetes/pkg/api/meta" "k8s.io/kubernetes/pkg/api/validation" "k8s.io/kubernetes/pkg/runtime" @@ -39,14 +37,16 @@ import ( const constSTDINstr string = "STDIN" // Visitor lets clients walk a list of resources. -// TODO: we should rethink how we handle errors in the visit loop -// (See http://pr.k8s.io/9357#issuecomment-109600305) type Visitor interface { Visit(VisitorFunc) error } -// VisitorFunc implements the Visitor interface for a matching function -type VisitorFunc func(*Info) error +// VisitorFunc implements the Visitor interface for a matching function. +// If there was a problem walking a list of resources, the incoming error +// will describe the problem and the function can decide how to handle that error. +// A nil returned indicates to accept an error to continue loops even when errors happen. +// This is useful for ignoring certain kinds of errors or aggregating errors in some way. +type VisitorFunc func(*Info, error) error // Watchable describes a resource that can be watched for changes that occur on the server, // beginning after the provided resource version. @@ -96,7 +96,7 @@ func NewInfo(client RESTClient, mapping *meta.RESTMapping, namespace, name strin // Visit implements Visitor func (i *Info) Visit(fn VisitorFunc) error { - return fn(i) + return fn(i, nil) } // Get retrieves the object from the Namespace and Name fields @@ -180,8 +180,12 @@ type EagerVisitorList []Visitor func (l EagerVisitorList) Visit(fn VisitorFunc) error { errs := []error(nil) for i := range l { - if err := l[i].Visit(func(info *Info) error { - if err := fn(info); err != nil { + if err := l[i].Visit(func(info *Info, err error) error { + if err != nil { + errs = append(errs, err) + return nil + } + if err := fn(info, nil); err != nil { errs = append(errs, err) } return nil @@ -234,7 +238,7 @@ func (v *URLVisitor) Visit(fn VisitorFunc) error { if err != nil { return err } - return fn(info) + return fn(info, nil) } // DecoratedVisitor will invoke the decorators in order prior to invoking the visitor function @@ -256,13 +260,16 @@ func NewDecoratedVisitor(v Visitor, fn ...VisitorFunc) Visitor { // Visit implements Visitor func (v DecoratedVisitor) Visit(fn VisitorFunc) error { - return v.visitor.Visit(func(info *Info) error { + return v.visitor.Visit(func(info *Info, err error) error { + if err != nil { + return err + } for i := range v.decorators { - if err := v.decorators[i](info); err != nil { + if err := v.decorators[i](info, nil); err != nil { return err } } - return fn(info) + return fn(info, nil) }) } @@ -281,8 +288,12 @@ type ContinueOnErrorVisitor struct { // not being visited. func (v ContinueOnErrorVisitor) Visit(fn VisitorFunc) error { errs := []error{} - err := v.Visitor.Visit(func(info *Info) error { - if err := fn(info); err != nil { + err := v.Visitor.Visit(func(info *Info, err error) error { + if err != nil { + errs = append(errs, err) + return nil + } + if err := fn(info, nil); err != nil { errs = append(errs, err) } return nil @@ -314,13 +325,16 @@ func NewFlattenListVisitor(v Visitor, mapper *Mapper) Visitor { } func (v FlattenListVisitor) Visit(fn VisitorFunc) error { - return v.Visitor.Visit(func(info *Info) error { + return v.Visitor.Visit(func(info *Info, err error) error { + if err != nil { + return err + } if info.Object == nil { - return fn(info) + return fn(info, nil) } items, err := runtime.ExtractList(info.Object) if err != nil { - return fn(info) + return fn(info, nil) } if errs := runtime.DecodeList(items, struct { runtime.ObjectTyper @@ -336,7 +350,7 @@ func (v FlattenListVisitor) Visit(fn VisitorFunc) error { if len(info.ResourceVersion) != 0 { item.ResourceVersion = info.ResourceVersion } - if err := fn(item); err != nil { + if err := fn(item, nil); err != nil { return err } } @@ -358,17 +372,17 @@ func ignoreFile(path string, extensions []string) bool { } // FileVisitorForSTDIN return a special FileVisitor just for STDIN -func FileVisitorForSTDIN(mapper *Mapper, ignoreErrors bool, schema validation.Schema) Visitor { +func FileVisitorForSTDIN(mapper *Mapper, schema validation.Schema) Visitor { return &FileVisitor{ Path: constSTDINstr, - StreamVisitor: NewStreamVisitor(nil, mapper, constSTDINstr, ignoreErrors, schema), + StreamVisitor: NewStreamVisitor(nil, mapper, constSTDINstr, schema), } } // ExpandPathsToFileVisitors will return a slice of FileVisitors that will handle files from the provided path. // After FileVisitors open the files, they will pass a io.Reader to a StreamVisitor to do the reading. (stdin // is also taken care of). Paths argument also accepts a single file, and will return a single visitor -func ExpandPathsToFileVisitors(mapper *Mapper, paths string, recursive bool, extensions []string, ignoreErrors bool, schema validation.Schema) ([]Visitor, error) { +func ExpandPathsToFileVisitors(mapper *Mapper, paths string, recursive bool, extensions []string, schema validation.Schema) ([]Visitor, error) { var visitors []Visitor err := filepath.Walk(paths, func(path string, fi os.FileInfo, err error) error { if err != nil { @@ -388,7 +402,7 @@ func ExpandPathsToFileVisitors(mapper *Mapper, paths string, recursive bool, ext visitor := &FileVisitor{ Path: path, - StreamVisitor: NewStreamVisitor(nil, mapper, path, ignoreErrors, schema), + StreamVisitor: NewStreamVisitor(nil, mapper, path, schema), } visitors = append(visitors, visitor) @@ -432,19 +446,17 @@ type StreamVisitor struct { io.Reader *Mapper - Source string - IgnoreErrors bool - Schema validation.Schema + Source string + Schema validation.Schema } // NewStreamVisitor is a helper function that is useful when we want to change the fields of the struct but keep calls the same. -func NewStreamVisitor(r io.Reader, mapper *Mapper, source string, ignoreErrors bool, schema validation.Schema) *StreamVisitor { +func NewStreamVisitor(r io.Reader, mapper *Mapper, source string, schema validation.Schema) *StreamVisitor { return &StreamVisitor{ - Reader: r, - Mapper: mapper, - Source: source, - IgnoreErrors: ignoreErrors, - Schema: schema, + Reader: r, + Mapper: mapper, + Source: source, + Schema: schema, } } @@ -468,20 +480,21 @@ func (v *StreamVisitor) Visit(fn VisitorFunc) error { } info, err := v.InfoForData(ext.RawJSON, v.Source) if err != nil { - if v.IgnoreErrors { - fmt.Fprintf(os.Stderr, "error: could not read an encoded object: %v\n", err) - glog.V(4).Infof("Unreadable: %s", string(ext.RawJSON)) - continue + if fnErr := fn(info, err); fnErr != nil { + return fnErr } - return err + continue } - if err := fn(info); err != nil { + if err := fn(info, nil); err != nil { return err } } } -func UpdateObjectNamespace(info *Info) error { +func UpdateObjectNamespace(info *Info, err error) error { + if err != nil { + return err + } if info.Object != nil { return info.Mapping.MetadataAccessor.SetNamespace(info.Object, info.Namespace) } @@ -489,10 +502,13 @@ func UpdateObjectNamespace(info *Info) error { } // FilterNamespace omits the namespace if the object is not namespace scoped -func FilterNamespace(info *Info) error { +func FilterNamespace(info *Info, err error) error { + if err != nil { + return err + } if !info.Namespaced() { info.Namespace = "" - UpdateObjectNamespace(info) + UpdateObjectNamespace(info, nil) } return nil } @@ -500,13 +516,16 @@ func FilterNamespace(info *Info) error { // SetNamespace ensures that every Info object visited will have a namespace // set. If info.Object is set, it will be mutated as well. func SetNamespace(namespace string) VisitorFunc { - return func(info *Info) error { + return func(info *Info, err error) error { + if err != nil { + return err + } if !info.Namespaced() { return nil } if len(info.Namespace) == 0 { info.Namespace = namespace - UpdateObjectNamespace(info) + UpdateObjectNamespace(info, nil) } return nil } @@ -517,13 +536,16 @@ func SetNamespace(namespace string) VisitorFunc { // value, returns an error. This is intended to guard against administrators // accidentally operating on resources outside their namespace. func RequireNamespace(namespace string) VisitorFunc { - return func(info *Info) error { + return func(info *Info, err error) error { + if err != nil { + return err + } if !info.Namespaced() { return nil } if len(info.Namespace) == 0 { info.Namespace = namespace - UpdateObjectNamespace(info) + UpdateObjectNamespace(info, nil) return nil } if info.Namespace != namespace { @@ -535,7 +557,10 @@ func RequireNamespace(namespace string) VisitorFunc { // RetrieveLatest updates the Object on each Info by invoking a standard client // Get. -func RetrieveLatest(info *Info) error { +func RetrieveLatest(info *Info, err error) error { + if err != nil { + return err + } if len(info.Name) == 0 { return nil } @@ -552,7 +577,10 @@ func RetrieveLatest(info *Info) error { } // RetrieveLazy updates the object if it has not been loaded yet. -func RetrieveLazy(info *Info) error { +func RetrieveLazy(info *Info, err error) error { + if err != nil { + return err + } if info.Object == nil { return info.Get() }