kubectl wait: split condition functions into separate files
This commit is contained in:
		
							
								
								
									
										197
									
								
								staging/src/k8s.io/kubectl/pkg/cmd/wait/condition.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										197
									
								
								staging/src/k8s.io/kubectl/pkg/cmd/wait/condition.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,197 @@
 | 
				
			|||||||
 | 
					/*
 | 
				
			||||||
 | 
					Copyright 2024 The Kubernetes Authors.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Licensed under the Apache License, Version 2.0 (the "License");
 | 
				
			||||||
 | 
					you may not use this file except in compliance with the License.
 | 
				
			||||||
 | 
					You may obtain a copy of the License at
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    http://www.apache.org/licenses/LICENSE-2.0
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Unless required by applicable law or agreed to in writing, software
 | 
				
			||||||
 | 
					distributed under the License is distributed on an "AS IS" BASIS,
 | 
				
			||||||
 | 
					WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
				
			||||||
 | 
					See the License for the specific language governing permissions and
 | 
				
			||||||
 | 
					limitations under the License.
 | 
				
			||||||
 | 
					*/
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					package wait
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import (
 | 
				
			||||||
 | 
						"context"
 | 
				
			||||||
 | 
						"errors"
 | 
				
			||||||
 | 
						"fmt"
 | 
				
			||||||
 | 
						"io"
 | 
				
			||||||
 | 
						"strings"
 | 
				
			||||||
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						apierrors "k8s.io/apimachinery/pkg/api/errors"
 | 
				
			||||||
 | 
						metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
				
			||||||
 | 
						"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
 | 
				
			||||||
 | 
						"k8s.io/apimachinery/pkg/fields"
 | 
				
			||||||
 | 
						"k8s.io/apimachinery/pkg/runtime"
 | 
				
			||||||
 | 
						"k8s.io/apimachinery/pkg/util/wait"
 | 
				
			||||||
 | 
						"k8s.io/apimachinery/pkg/watch"
 | 
				
			||||||
 | 
						"k8s.io/cli-runtime/pkg/resource"
 | 
				
			||||||
 | 
						"k8s.io/client-go/tools/cache"
 | 
				
			||||||
 | 
						watchtools "k8s.io/client-go/tools/watch"
 | 
				
			||||||
 | 
						"k8s.io/kubectl/pkg/util/interrupt"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// ConditionalWait hold information to check an API status condition
 | 
				
			||||||
 | 
					type ConditionalWait struct {
 | 
				
			||||||
 | 
						conditionName   string
 | 
				
			||||||
 | 
						conditionStatus string
 | 
				
			||||||
 | 
						// errOut is written to if an error occurs
 | 
				
			||||||
 | 
						errOut io.Writer
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// IsConditionMet is a conditionfunc for waiting on an API condition to be met
 | 
				
			||||||
 | 
					func (w ConditionalWait) IsConditionMet(ctx context.Context, info *resource.Info, o *WaitOptions) (runtime.Object, bool, error) {
 | 
				
			||||||
 | 
						return getObjAndCheckCondition(ctx, info, o, w.isConditionMet, w.checkCondition)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (w ConditionalWait) checkCondition(obj *unstructured.Unstructured) (bool, error) {
 | 
				
			||||||
 | 
						conditions, found, err := unstructured.NestedSlice(obj.Object, "status", "conditions")
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return false, err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if !found {
 | 
				
			||||||
 | 
							return false, nil
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						for _, conditionUncast := range conditions {
 | 
				
			||||||
 | 
							condition := conditionUncast.(map[string]interface{})
 | 
				
			||||||
 | 
							name, found, err := unstructured.NestedString(condition, "type")
 | 
				
			||||||
 | 
							if !found || err != nil || !strings.EqualFold(name, w.conditionName) {
 | 
				
			||||||
 | 
								continue
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							status, found, err := unstructured.NestedString(condition, "status")
 | 
				
			||||||
 | 
							if !found || err != nil {
 | 
				
			||||||
 | 
								continue
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							generation, found, _ := unstructured.NestedInt64(obj.Object, "metadata", "generation")
 | 
				
			||||||
 | 
							if found {
 | 
				
			||||||
 | 
								observedGeneration, found := getObservedGeneration(obj, condition)
 | 
				
			||||||
 | 
								if found && observedGeneration < generation {
 | 
				
			||||||
 | 
									return false, nil
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							return strings.EqualFold(status, w.conditionStatus), nil
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return false, nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (w ConditionalWait) isConditionMet(event watch.Event) (bool, error) {
 | 
				
			||||||
 | 
						if event.Type == watch.Error {
 | 
				
			||||||
 | 
							// keep waiting in the event we see an error - we expect the watch to be closed by
 | 
				
			||||||
 | 
							// the server
 | 
				
			||||||
 | 
							err := apierrors.FromObject(event.Object)
 | 
				
			||||||
 | 
							fmt.Fprintf(w.errOut, "error: An error occurred while waiting for the condition to be satisfied: %v", err)
 | 
				
			||||||
 | 
							return false, nil
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if event.Type == watch.Deleted {
 | 
				
			||||||
 | 
							// this will chain back out, result in another get and an return false back up the chain
 | 
				
			||||||
 | 
							return false, nil
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						obj := event.Object.(*unstructured.Unstructured)
 | 
				
			||||||
 | 
						return w.checkCondition(obj)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type isCondMetFunc func(event watch.Event) (bool, error)
 | 
				
			||||||
 | 
					type checkCondFunc func(obj *unstructured.Unstructured) (bool, error)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// getObjAndCheckCondition will make a List query to the API server to get the object and check if the condition is met using check function.
 | 
				
			||||||
 | 
					// If the condition is not met, it will make a Watch query to the server and pass in the condMet function
 | 
				
			||||||
 | 
					func getObjAndCheckCondition(ctx context.Context, info *resource.Info, o *WaitOptions, condMet isCondMetFunc, check checkCondFunc) (runtime.Object, bool, error) {
 | 
				
			||||||
 | 
						if len(info.Name) == 0 {
 | 
				
			||||||
 | 
							return info.Object, false, fmt.Errorf("resource name must be provided")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						endTime := time.Now().Add(o.Timeout)
 | 
				
			||||||
 | 
						timeout := time.Until(endTime)
 | 
				
			||||||
 | 
						errWaitTimeoutWithName := extendErrWaitTimeout(wait.ErrWaitTimeout, info) // nolint:staticcheck // SA1019
 | 
				
			||||||
 | 
						if o.Timeout == 0 {
 | 
				
			||||||
 | 
							// If timeout is zero we will fetch the object(s) once only and check
 | 
				
			||||||
 | 
							gottenObj, initObjGetErr := o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).Get(context.Background(), info.Name, metav1.GetOptions{})
 | 
				
			||||||
 | 
							if initObjGetErr != nil {
 | 
				
			||||||
 | 
								return nil, false, initObjGetErr
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							if gottenObj == nil {
 | 
				
			||||||
 | 
								return nil, false, fmt.Errorf("condition not met for %s", info.ObjectName())
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							conditionCheck, err := check(gottenObj)
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								return gottenObj, false, err
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							if !conditionCheck {
 | 
				
			||||||
 | 
								return gottenObj, false, fmt.Errorf("condition not met for %s", info.ObjectName())
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							return gottenObj, true, nil
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if timeout < 0 {
 | 
				
			||||||
 | 
							// we're out of time
 | 
				
			||||||
 | 
							return info.Object, false, errWaitTimeoutWithName
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						mapping := info.ResourceMapping() // used to pass back meaningful errors if object disappears
 | 
				
			||||||
 | 
						fieldSelector := fields.OneTermEqualSelector("metadata.name", info.Name).String()
 | 
				
			||||||
 | 
						lw := &cache.ListWatch{
 | 
				
			||||||
 | 
							ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
 | 
				
			||||||
 | 
								options.FieldSelector = fieldSelector
 | 
				
			||||||
 | 
								return o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).List(context.TODO(), options)
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
 | 
				
			||||||
 | 
								options.FieldSelector = fieldSelector
 | 
				
			||||||
 | 
								return o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).Watch(context.TODO(), options)
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// this function is used to refresh the cache to prevent timeout waits on resources that have disappeared
 | 
				
			||||||
 | 
						preconditionFunc := func(store cache.Store) (bool, error) {
 | 
				
			||||||
 | 
							_, exists, err := store.Get(&metav1.ObjectMeta{Namespace: info.Namespace, Name: info.Name})
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								return true, err
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							if !exists {
 | 
				
			||||||
 | 
								return true, apierrors.NewNotFound(mapping.Resource.GroupResource(), info.Name)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							return false, nil
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						intrCtx, cancel := context.WithCancel(ctx)
 | 
				
			||||||
 | 
						defer cancel()
 | 
				
			||||||
 | 
						var result runtime.Object
 | 
				
			||||||
 | 
						intr := interrupt.New(nil, cancel)
 | 
				
			||||||
 | 
						err := intr.Run(func() error {
 | 
				
			||||||
 | 
							ev, err := watchtools.UntilWithSync(intrCtx, lw, &unstructured.Unstructured{}, preconditionFunc, watchtools.ConditionFunc(condMet))
 | 
				
			||||||
 | 
							if ev != nil {
 | 
				
			||||||
 | 
								result = ev.Object
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							if errors.Is(err, context.DeadlineExceeded) {
 | 
				
			||||||
 | 
								return errWaitTimeoutWithName
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							return err
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							if errors.Is(err, wait.ErrWaitTimeout) { // nolint:staticcheck // SA1019
 | 
				
			||||||
 | 
								return result, false, errWaitTimeoutWithName
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							return result, false, err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return result, true, nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func extendErrWaitTimeout(err error, info *resource.Info) error {
 | 
				
			||||||
 | 
						return fmt.Errorf("%s on %s/%s", err.Error(), info.Mapping.Resource.Resource, info.Name)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func getObservedGeneration(obj *unstructured.Unstructured, condition map[string]interface{}) (int64, bool) {
 | 
				
			||||||
 | 
						conditionObservedGeneration, found, _ := unstructured.NestedInt64(condition, "observedGeneration")
 | 
				
			||||||
 | 
						if found {
 | 
				
			||||||
 | 
							return conditionObservedGeneration, true
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						statusObservedGeneration, found, _ := unstructured.NestedInt64(obj.Object, "status", "observedGeneration")
 | 
				
			||||||
 | 
						return statusObservedGeneration, found
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
							
								
								
									
										144
									
								
								staging/src/k8s.io/kubectl/pkg/cmd/wait/delete.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										144
									
								
								staging/src/k8s.io/kubectl/pkg/cmd/wait/delete.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,144 @@
 | 
				
			|||||||
 | 
					/*
 | 
				
			||||||
 | 
					Copyright 2024 The Kubernetes Authors.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Licensed under the Apache License, Version 2.0 (the "License");
 | 
				
			||||||
 | 
					you may not use this file except in compliance with the License.
 | 
				
			||||||
 | 
					You may obtain a copy of the License at
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    http://www.apache.org/licenses/LICENSE-2.0
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Unless required by applicable law or agreed to in writing, software
 | 
				
			||||||
 | 
					distributed under the License is distributed on an "AS IS" BASIS,
 | 
				
			||||||
 | 
					WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
				
			||||||
 | 
					See the License for the specific language governing permissions and
 | 
				
			||||||
 | 
					limitations under the License.
 | 
				
			||||||
 | 
					*/
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					package wait
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import (
 | 
				
			||||||
 | 
						"context"
 | 
				
			||||||
 | 
						"errors"
 | 
				
			||||||
 | 
						"fmt"
 | 
				
			||||||
 | 
						"io"
 | 
				
			||||||
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						apierrors "k8s.io/apimachinery/pkg/api/errors"
 | 
				
			||||||
 | 
						metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
				
			||||||
 | 
						"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
 | 
				
			||||||
 | 
						"k8s.io/apimachinery/pkg/fields"
 | 
				
			||||||
 | 
						"k8s.io/apimachinery/pkg/runtime"
 | 
				
			||||||
 | 
						"k8s.io/apimachinery/pkg/util/wait"
 | 
				
			||||||
 | 
						"k8s.io/apimachinery/pkg/watch"
 | 
				
			||||||
 | 
						"k8s.io/cli-runtime/pkg/resource"
 | 
				
			||||||
 | 
						"k8s.io/client-go/tools/cache"
 | 
				
			||||||
 | 
						watchtools "k8s.io/client-go/tools/watch"
 | 
				
			||||||
 | 
						"k8s.io/kubectl/pkg/util/interrupt"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// IsDeleted is a condition func for waiting for something to be deleted
 | 
				
			||||||
 | 
					func IsDeleted(ctx context.Context, info *resource.Info, o *WaitOptions) (runtime.Object, bool, error) {
 | 
				
			||||||
 | 
						if len(info.Name) == 0 {
 | 
				
			||||||
 | 
							return info.Object, false, fmt.Errorf("resource name must be provided")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						gottenObj, initObjGetErr := o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).Get(ctx, info.Name, metav1.GetOptions{})
 | 
				
			||||||
 | 
						if apierrors.IsNotFound(initObjGetErr) {
 | 
				
			||||||
 | 
							return info.Object, true, nil
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if initObjGetErr != nil {
 | 
				
			||||||
 | 
							// TODO this could do something slightly fancier if we wish
 | 
				
			||||||
 | 
							return info.Object, false, initObjGetErr
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						resourceLocation := ResourceLocation{
 | 
				
			||||||
 | 
							GroupResource: info.Mapping.Resource.GroupResource(),
 | 
				
			||||||
 | 
							Namespace:     gottenObj.GetNamespace(),
 | 
				
			||||||
 | 
							Name:          gottenObj.GetName(),
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if uid, ok := o.UIDMap[resourceLocation]; ok {
 | 
				
			||||||
 | 
							if gottenObj.GetUID() != uid {
 | 
				
			||||||
 | 
								return gottenObj, true, nil
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						endTime := time.Now().Add(o.Timeout)
 | 
				
			||||||
 | 
						timeout := time.Until(endTime)
 | 
				
			||||||
 | 
						errWaitTimeoutWithName := extendErrWaitTimeout(wait.ErrWaitTimeout, info) // nolint:staticcheck // SA1019
 | 
				
			||||||
 | 
						if o.Timeout == 0 {
 | 
				
			||||||
 | 
							// If timeout is zero check if the object exists once only
 | 
				
			||||||
 | 
							if gottenObj == nil {
 | 
				
			||||||
 | 
								return nil, true, nil
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							return gottenObj, false, fmt.Errorf("condition not met for %s", info.ObjectName())
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if timeout < 0 {
 | 
				
			||||||
 | 
							// we're out of time
 | 
				
			||||||
 | 
							return info.Object, false, errWaitTimeoutWithName
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						fieldSelector := fields.OneTermEqualSelector("metadata.name", info.Name).String()
 | 
				
			||||||
 | 
						lw := &cache.ListWatch{
 | 
				
			||||||
 | 
							ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
 | 
				
			||||||
 | 
								options.FieldSelector = fieldSelector
 | 
				
			||||||
 | 
								return o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).List(ctx, options)
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
							WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
 | 
				
			||||||
 | 
								options.FieldSelector = fieldSelector
 | 
				
			||||||
 | 
								return o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).Watch(ctx, options)
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// this function is used to refresh the cache to prevent timeout waits on resources that have disappeared
 | 
				
			||||||
 | 
						preconditionFunc := func(store cache.Store) (bool, error) {
 | 
				
			||||||
 | 
							_, exists, err := store.Get(&metav1.ObjectMeta{Namespace: info.Namespace, Name: info.Name})
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								return true, err
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							if !exists {
 | 
				
			||||||
 | 
								// since we're looking for it to disappear we just return here if it no longer exists
 | 
				
			||||||
 | 
								return true, nil
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							return false, nil
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						intrCtx, cancel := context.WithCancel(ctx)
 | 
				
			||||||
 | 
						defer cancel()
 | 
				
			||||||
 | 
						intr := interrupt.New(nil, cancel)
 | 
				
			||||||
 | 
						err := intr.Run(func() error {
 | 
				
			||||||
 | 
							_, err := watchtools.UntilWithSync(intrCtx, lw, &unstructured.Unstructured{}, preconditionFunc, Wait{errOut: o.ErrOut}.IsDeleted)
 | 
				
			||||||
 | 
							if errors.Is(err, context.DeadlineExceeded) {
 | 
				
			||||||
 | 
								return errWaitTimeoutWithName
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							return err
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							if errors.Is(err, wait.ErrWaitTimeout) { // nolint:staticcheck // SA1019
 | 
				
			||||||
 | 
								return gottenObj, false, errWaitTimeoutWithName
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							return gottenObj, false, err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return gottenObj, true, nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Wait has helper methods for handling watches, including error handling.
 | 
				
			||||||
 | 
					type Wait struct {
 | 
				
			||||||
 | 
						errOut io.Writer
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// IsDeleted returns true if the object is deleted. It prints any errors it encounters.
 | 
				
			||||||
 | 
					func (w Wait) IsDeleted(event watch.Event) (bool, error) {
 | 
				
			||||||
 | 
						switch event.Type {
 | 
				
			||||||
 | 
						case watch.Error:
 | 
				
			||||||
 | 
							// keep waiting in the event we see an error - we expect the watch to be closed by
 | 
				
			||||||
 | 
							// the server if the error is unrecoverable.
 | 
				
			||||||
 | 
							err := apierrors.FromObject(event.Object)
 | 
				
			||||||
 | 
							fmt.Fprintf(w.errOut, "error: An error occurred while waiting for the object to be deleted: %v", err)
 | 
				
			||||||
 | 
							return false, nil
 | 
				
			||||||
 | 
						case watch.Deleted:
 | 
				
			||||||
 | 
							return true, nil
 | 
				
			||||||
 | 
						default:
 | 
				
			||||||
 | 
							return false, nil
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
							
								
								
									
										120
									
								
								staging/src/k8s.io/kubectl/pkg/cmd/wait/json.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										120
									
								
								staging/src/k8s.io/kubectl/pkg/cmd/wait/json.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,120 @@
 | 
				
			|||||||
 | 
					/*
 | 
				
			||||||
 | 
					Copyright 2024 The Kubernetes Authors.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Licensed under the Apache License, Version 2.0 (the "License");
 | 
				
			||||||
 | 
					you may not use this file except in compliance with the License.
 | 
				
			||||||
 | 
					You may obtain a copy of the License at
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    http://www.apache.org/licenses/LICENSE-2.0
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Unless required by applicable law or agreed to in writing, software
 | 
				
			||||||
 | 
					distributed under the License is distributed on an "AS IS" BASIS,
 | 
				
			||||||
 | 
					WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
				
			||||||
 | 
					See the License for the specific language governing permissions and
 | 
				
			||||||
 | 
					limitations under the License.
 | 
				
			||||||
 | 
					*/
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					package wait
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import (
 | 
				
			||||||
 | 
						"context"
 | 
				
			||||||
 | 
						"errors"
 | 
				
			||||||
 | 
						"fmt"
 | 
				
			||||||
 | 
						"io"
 | 
				
			||||||
 | 
						"reflect"
 | 
				
			||||||
 | 
						"strings"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						apierrors "k8s.io/apimachinery/pkg/api/errors"
 | 
				
			||||||
 | 
						"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
 | 
				
			||||||
 | 
						"k8s.io/apimachinery/pkg/runtime"
 | 
				
			||||||
 | 
						"k8s.io/apimachinery/pkg/watch"
 | 
				
			||||||
 | 
						"k8s.io/cli-runtime/pkg/resource"
 | 
				
			||||||
 | 
						"k8s.io/client-go/util/jsonpath"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// JSONPathWait holds a JSONPath Parser which has the ability
 | 
				
			||||||
 | 
					// to check for the JSONPath condition and compare with the API server provided JSON output.
 | 
				
			||||||
 | 
					type JSONPathWait struct {
 | 
				
			||||||
 | 
						matchAnyValue  bool
 | 
				
			||||||
 | 
						jsonPathValue  string
 | 
				
			||||||
 | 
						jsonPathParser *jsonpath.JSONPath
 | 
				
			||||||
 | 
						// errOut is written to if an error occurs
 | 
				
			||||||
 | 
						errOut io.Writer
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// IsJSONPathConditionMet fulfills the requirements of the interface ConditionFunc which provides condition check
 | 
				
			||||||
 | 
					func (j JSONPathWait) IsJSONPathConditionMet(ctx context.Context, info *resource.Info, o *WaitOptions) (runtime.Object, bool, error) {
 | 
				
			||||||
 | 
						return getObjAndCheckCondition(ctx, info, o, j.isJSONPathConditionMet, j.checkCondition)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// isJSONPathConditionMet is a helper function of IsJSONPathConditionMet
 | 
				
			||||||
 | 
					// which check the watch event and check if a JSONPathWait condition is met
 | 
				
			||||||
 | 
					func (j JSONPathWait) isJSONPathConditionMet(event watch.Event) (bool, error) {
 | 
				
			||||||
 | 
						if event.Type == watch.Error {
 | 
				
			||||||
 | 
							// keep waiting in the event we see an error - we expect the watch to be closed by
 | 
				
			||||||
 | 
							// the server
 | 
				
			||||||
 | 
							err := apierrors.FromObject(event.Object)
 | 
				
			||||||
 | 
							fmt.Fprintf(j.errOut, "error: An error occurred while waiting for the condition to be satisfied: %v", err)
 | 
				
			||||||
 | 
							return false, nil
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if event.Type == watch.Deleted {
 | 
				
			||||||
 | 
							// this will chain back out, result in another get and an return false back up the chain
 | 
				
			||||||
 | 
							return false, nil
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						// event runtime Object can be safely asserted to Unstructed
 | 
				
			||||||
 | 
						// because we are working with dynamic client
 | 
				
			||||||
 | 
						obj := event.Object.(*unstructured.Unstructured)
 | 
				
			||||||
 | 
						return j.checkCondition(obj)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// checkCondition uses JSONPath parser to parse the JSON received from the API server
 | 
				
			||||||
 | 
					// and check if it matches the desired condition
 | 
				
			||||||
 | 
					func (j JSONPathWait) checkCondition(obj *unstructured.Unstructured) (bool, error) {
 | 
				
			||||||
 | 
						queryObj := obj.UnstructuredContent()
 | 
				
			||||||
 | 
						parseResults, err := j.jsonPathParser.FindResults(queryObj)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return false, err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if len(parseResults) == 0 || len(parseResults[0]) == 0 {
 | 
				
			||||||
 | 
							return false, nil
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if err := verifyParsedJSONPath(parseResults); err != nil {
 | 
				
			||||||
 | 
							return false, err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if j.matchAnyValue {
 | 
				
			||||||
 | 
							return true, nil
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						isConditionMet, err := compareResults(parseResults[0][0], j.jsonPathValue)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return false, err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return isConditionMet, nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// verifyParsedJSONPath verifies the JSON received from the API server is valid.
 | 
				
			||||||
 | 
					// It will only accept a single JSON
 | 
				
			||||||
 | 
					func verifyParsedJSONPath(results [][]reflect.Value) error {
 | 
				
			||||||
 | 
						if len(results) > 1 {
 | 
				
			||||||
 | 
							return errors.New("given jsonpath expression matches more than one list")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if len(results[0]) > 1 {
 | 
				
			||||||
 | 
							return errors.New("given jsonpath expression matches more than one value")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// compareResults will compare the reflect.Value from the result parsed by the
 | 
				
			||||||
 | 
					// JSONPath parser with the expected value given by the value
 | 
				
			||||||
 | 
					//
 | 
				
			||||||
 | 
					// Since this is coming from an unstructured this can only ever be a primitive,
 | 
				
			||||||
 | 
					// map[string]interface{}, or []interface{}.
 | 
				
			||||||
 | 
					// We do not support the last two and rely on fmt to handle conversion to string
 | 
				
			||||||
 | 
					// and compare the result with user input
 | 
				
			||||||
 | 
					func compareResults(r reflect.Value, expectedVal string) (bool, error) {
 | 
				
			||||||
 | 
						switch r.Interface().(type) {
 | 
				
			||||||
 | 
						case map[string]interface{}, []interface{}:
 | 
				
			||||||
 | 
							return false, errors.New("jsonpath leads to a nested object or list which is not supported")
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						s := fmt.Sprintf("%v", r.Interface())
 | 
				
			||||||
 | 
						return strings.TrimSpace(s) == strings.TrimSpace(expectedVal), nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
@@ -21,33 +21,25 @@ import (
 | 
				
			|||||||
	"errors"
 | 
						"errors"
 | 
				
			||||||
	"fmt"
 | 
						"fmt"
 | 
				
			||||||
	"io"
 | 
						"io"
 | 
				
			||||||
	"reflect"
 | 
					 | 
				
			||||||
	"strings"
 | 
						"strings"
 | 
				
			||||||
	"time"
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"github.com/spf13/cobra"
 | 
						"github.com/spf13/cobra"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	apierrors "k8s.io/apimachinery/pkg/api/errors"
 | 
						apierrors "k8s.io/apimachinery/pkg/api/errors"
 | 
				
			||||||
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
					 | 
				
			||||||
	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
 | 
					 | 
				
			||||||
	"k8s.io/apimachinery/pkg/fields"
 | 
					 | 
				
			||||||
	"k8s.io/apimachinery/pkg/runtime"
 | 
						"k8s.io/apimachinery/pkg/runtime"
 | 
				
			||||||
	"k8s.io/apimachinery/pkg/runtime/schema"
 | 
						"k8s.io/apimachinery/pkg/runtime/schema"
 | 
				
			||||||
	"k8s.io/apimachinery/pkg/types"
 | 
						"k8s.io/apimachinery/pkg/types"
 | 
				
			||||||
	"k8s.io/apimachinery/pkg/util/wait"
 | 
					 | 
				
			||||||
	"k8s.io/apimachinery/pkg/watch"
 | 
					 | 
				
			||||||
	"k8s.io/cli-runtime/pkg/genericclioptions"
 | 
						"k8s.io/cli-runtime/pkg/genericclioptions"
 | 
				
			||||||
	"k8s.io/cli-runtime/pkg/genericiooptions"
 | 
						"k8s.io/cli-runtime/pkg/genericiooptions"
 | 
				
			||||||
	"k8s.io/cli-runtime/pkg/printers"
 | 
						"k8s.io/cli-runtime/pkg/printers"
 | 
				
			||||||
	"k8s.io/cli-runtime/pkg/resource"
 | 
						"k8s.io/cli-runtime/pkg/resource"
 | 
				
			||||||
	"k8s.io/client-go/dynamic"
 | 
						"k8s.io/client-go/dynamic"
 | 
				
			||||||
	"k8s.io/client-go/tools/cache"
 | 
					 | 
				
			||||||
	watchtools "k8s.io/client-go/tools/watch"
 | 
						watchtools "k8s.io/client-go/tools/watch"
 | 
				
			||||||
	"k8s.io/client-go/util/jsonpath"
 | 
						"k8s.io/client-go/util/jsonpath"
 | 
				
			||||||
	cmdget "k8s.io/kubectl/pkg/cmd/get"
 | 
						cmdget "k8s.io/kubectl/pkg/cmd/get"
 | 
				
			||||||
	cmdutil "k8s.io/kubectl/pkg/cmd/util"
 | 
						cmdutil "k8s.io/kubectl/pkg/cmd/util"
 | 
				
			||||||
	"k8s.io/kubectl/pkg/util/i18n"
 | 
						"k8s.io/kubectl/pkg/util/i18n"
 | 
				
			||||||
	"k8s.io/kubectl/pkg/util/interrupt"
 | 
					 | 
				
			||||||
	"k8s.io/kubectl/pkg/util/templates"
 | 
						"k8s.io/kubectl/pkg/util/templates"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -352,356 +344,3 @@ func (o *WaitOptions) RunWait() error {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
	return err
 | 
						return err
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					 | 
				
			||||||
// IsDeleted is a condition func for waiting for something to be deleted
 | 
					 | 
				
			||||||
func IsDeleted(ctx context.Context, info *resource.Info, o *WaitOptions) (runtime.Object, bool, error) {
 | 
					 | 
				
			||||||
	if len(info.Name) == 0 {
 | 
					 | 
				
			||||||
		return info.Object, false, fmt.Errorf("resource name must be provided")
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	gottenObj, initObjGetErr := o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).Get(context.Background(), info.Name, metav1.GetOptions{})
 | 
					 | 
				
			||||||
	if apierrors.IsNotFound(initObjGetErr) {
 | 
					 | 
				
			||||||
		return info.Object, true, nil
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	if initObjGetErr != nil {
 | 
					 | 
				
			||||||
		// TODO this could do something slightly fancier if we wish
 | 
					 | 
				
			||||||
		return info.Object, false, initObjGetErr
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	resourceLocation := ResourceLocation{
 | 
					 | 
				
			||||||
		GroupResource: info.Mapping.Resource.GroupResource(),
 | 
					 | 
				
			||||||
		Namespace:     gottenObj.GetNamespace(),
 | 
					 | 
				
			||||||
		Name:          gottenObj.GetName(),
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	if uid, ok := o.UIDMap[resourceLocation]; ok {
 | 
					 | 
				
			||||||
		if gottenObj.GetUID() != uid {
 | 
					 | 
				
			||||||
			return gottenObj, true, nil
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	endTime := time.Now().Add(o.Timeout)
 | 
					 | 
				
			||||||
	timeout := time.Until(endTime)
 | 
					 | 
				
			||||||
	errWaitTimeoutWithName := extendErrWaitTimeout(wait.ErrWaitTimeout, info)
 | 
					 | 
				
			||||||
	if o.Timeout == 0 {
 | 
					 | 
				
			||||||
		// If timeout is zero check if the object exists once only
 | 
					 | 
				
			||||||
		if gottenObj == nil {
 | 
					 | 
				
			||||||
			return nil, true, nil
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		return gottenObj, false, fmt.Errorf("condition not met for %s", info.ObjectName())
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	if timeout < 0 {
 | 
					 | 
				
			||||||
		// we're out of time
 | 
					 | 
				
			||||||
		return info.Object, false, errWaitTimeoutWithName
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	fieldSelector := fields.OneTermEqualSelector("metadata.name", info.Name).String()
 | 
					 | 
				
			||||||
	lw := &cache.ListWatch{
 | 
					 | 
				
			||||||
		ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
 | 
					 | 
				
			||||||
			options.FieldSelector = fieldSelector
 | 
					 | 
				
			||||||
			return o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).List(context.TODO(), options)
 | 
					 | 
				
			||||||
		},
 | 
					 | 
				
			||||||
		WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
 | 
					 | 
				
			||||||
			options.FieldSelector = fieldSelector
 | 
					 | 
				
			||||||
			return o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).Watch(context.TODO(), options)
 | 
					 | 
				
			||||||
		},
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// this function is used to refresh the cache to prevent timeout waits on resources that have disappeared
 | 
					 | 
				
			||||||
	preconditionFunc := func(store cache.Store) (bool, error) {
 | 
					 | 
				
			||||||
		_, exists, err := store.Get(&metav1.ObjectMeta{Namespace: info.Namespace, Name: info.Name})
 | 
					 | 
				
			||||||
		if err != nil {
 | 
					 | 
				
			||||||
			return true, err
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		if !exists {
 | 
					 | 
				
			||||||
			// since we're looking for it to disappear we just return here if it no longer exists
 | 
					 | 
				
			||||||
			return true, nil
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
		return false, nil
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	intrCtx, cancel := context.WithCancel(ctx)
 | 
					 | 
				
			||||||
	defer cancel()
 | 
					 | 
				
			||||||
	intr := interrupt.New(nil, cancel)
 | 
					 | 
				
			||||||
	err := intr.Run(func() error {
 | 
					 | 
				
			||||||
		_, err := watchtools.UntilWithSync(intrCtx, lw, &unstructured.Unstructured{}, preconditionFunc, Wait{errOut: o.ErrOut}.IsDeleted)
 | 
					 | 
				
			||||||
		if errors.Is(err, context.DeadlineExceeded) {
 | 
					 | 
				
			||||||
			return errWaitTimeoutWithName
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		return err
 | 
					 | 
				
			||||||
	})
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		if err == wait.ErrWaitTimeout {
 | 
					 | 
				
			||||||
			return gottenObj, false, errWaitTimeoutWithName
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		return gottenObj, false, err
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	return gottenObj, true, nil
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// Wait has helper methods for handling watches, including error handling.
 | 
					 | 
				
			||||||
type Wait struct {
 | 
					 | 
				
			||||||
	errOut io.Writer
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// IsDeleted returns true if the object is deleted. It prints any errors it encounters.
 | 
					 | 
				
			||||||
func (w Wait) IsDeleted(event watch.Event) (bool, error) {
 | 
					 | 
				
			||||||
	switch event.Type {
 | 
					 | 
				
			||||||
	case watch.Error:
 | 
					 | 
				
			||||||
		// keep waiting in the event we see an error - we expect the watch to be closed by
 | 
					 | 
				
			||||||
		// the server if the error is unrecoverable.
 | 
					 | 
				
			||||||
		err := apierrors.FromObject(event.Object)
 | 
					 | 
				
			||||||
		fmt.Fprintf(w.errOut, "error: An error occurred while waiting for the object to be deleted: %v", err)
 | 
					 | 
				
			||||||
		return false, nil
 | 
					 | 
				
			||||||
	case watch.Deleted:
 | 
					 | 
				
			||||||
		return true, nil
 | 
					 | 
				
			||||||
	default:
 | 
					 | 
				
			||||||
		return false, nil
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
type isCondMetFunc func(event watch.Event) (bool, error)
 | 
					 | 
				
			||||||
type checkCondFunc func(obj *unstructured.Unstructured) (bool, error)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// getObjAndCheckCondition will make a List query to the API server to get the object and check if the condition is met using check function.
 | 
					 | 
				
			||||||
// If the condition is not met, it will make a Watch query to the server and pass in the condMet function
 | 
					 | 
				
			||||||
func getObjAndCheckCondition(ctx context.Context, info *resource.Info, o *WaitOptions, condMet isCondMetFunc, check checkCondFunc) (runtime.Object, bool, error) {
 | 
					 | 
				
			||||||
	if len(info.Name) == 0 {
 | 
					 | 
				
			||||||
		return info.Object, false, fmt.Errorf("resource name must be provided")
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	endTime := time.Now().Add(o.Timeout)
 | 
					 | 
				
			||||||
	timeout := time.Until(endTime)
 | 
					 | 
				
			||||||
	errWaitTimeoutWithName := extendErrWaitTimeout(wait.ErrWaitTimeout, info)
 | 
					 | 
				
			||||||
	if o.Timeout == 0 {
 | 
					 | 
				
			||||||
		// If timeout is zero we will fetch the object(s) once only and check
 | 
					 | 
				
			||||||
		gottenObj, initObjGetErr := o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).Get(context.Background(), info.Name, metav1.GetOptions{})
 | 
					 | 
				
			||||||
		if initObjGetErr != nil {
 | 
					 | 
				
			||||||
			return nil, false, initObjGetErr
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		if gottenObj == nil {
 | 
					 | 
				
			||||||
			return nil, false, fmt.Errorf("condition not met for %s", info.ObjectName())
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		conditionCheck, err := check(gottenObj)
 | 
					 | 
				
			||||||
		if err != nil {
 | 
					 | 
				
			||||||
			return gottenObj, false, err
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		if conditionCheck == false {
 | 
					 | 
				
			||||||
			return gottenObj, false, fmt.Errorf("condition not met for %s", info.ObjectName())
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		return gottenObj, true, nil
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	if timeout < 0 {
 | 
					 | 
				
			||||||
		// we're out of time
 | 
					 | 
				
			||||||
		return info.Object, false, errWaitTimeoutWithName
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	mapping := info.ResourceMapping() // used to pass back meaningful errors if object disappears
 | 
					 | 
				
			||||||
	fieldSelector := fields.OneTermEqualSelector("metadata.name", info.Name).String()
 | 
					 | 
				
			||||||
	lw := &cache.ListWatch{
 | 
					 | 
				
			||||||
		ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
 | 
					 | 
				
			||||||
			options.FieldSelector = fieldSelector
 | 
					 | 
				
			||||||
			return o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).List(context.TODO(), options)
 | 
					 | 
				
			||||||
		},
 | 
					 | 
				
			||||||
		WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
 | 
					 | 
				
			||||||
			options.FieldSelector = fieldSelector
 | 
					 | 
				
			||||||
			return o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).Watch(context.TODO(), options)
 | 
					 | 
				
			||||||
		},
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// this function is used to refresh the cache to prevent timeout waits on resources that have disappeared
 | 
					 | 
				
			||||||
	preconditionFunc := func(store cache.Store) (bool, error) {
 | 
					 | 
				
			||||||
		_, exists, err := store.Get(&metav1.ObjectMeta{Namespace: info.Namespace, Name: info.Name})
 | 
					 | 
				
			||||||
		if err != nil {
 | 
					 | 
				
			||||||
			return true, err
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		if !exists {
 | 
					 | 
				
			||||||
			return true, apierrors.NewNotFound(mapping.Resource.GroupResource(), info.Name)
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
		return false, nil
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	intrCtx, cancel := context.WithCancel(ctx)
 | 
					 | 
				
			||||||
	defer cancel()
 | 
					 | 
				
			||||||
	var result runtime.Object
 | 
					 | 
				
			||||||
	intr := interrupt.New(nil, cancel)
 | 
					 | 
				
			||||||
	err := intr.Run(func() error {
 | 
					 | 
				
			||||||
		ev, err := watchtools.UntilWithSync(intrCtx, lw, &unstructured.Unstructured{}, preconditionFunc, watchtools.ConditionFunc(condMet))
 | 
					 | 
				
			||||||
		if ev != nil {
 | 
					 | 
				
			||||||
			result = ev.Object
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		if errors.Is(err, context.DeadlineExceeded) {
 | 
					 | 
				
			||||||
			return errWaitTimeoutWithName
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		return err
 | 
					 | 
				
			||||||
	})
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		if err == wait.ErrWaitTimeout {
 | 
					 | 
				
			||||||
			return result, false, errWaitTimeoutWithName
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		return result, false, err
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	return result, true, nil
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// ConditionalWait hold information to check an API status condition
 | 
					 | 
				
			||||||
type ConditionalWait struct {
 | 
					 | 
				
			||||||
	conditionName   string
 | 
					 | 
				
			||||||
	conditionStatus string
 | 
					 | 
				
			||||||
	// errOut is written to if an error occurs
 | 
					 | 
				
			||||||
	errOut io.Writer
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// IsConditionMet is a conditionfunc for waiting on an API condition to be met
 | 
					 | 
				
			||||||
func (w ConditionalWait) IsConditionMet(ctx context.Context, info *resource.Info, o *WaitOptions) (runtime.Object, bool, error) {
 | 
					 | 
				
			||||||
	return getObjAndCheckCondition(ctx, info, o, w.isConditionMet, w.checkCondition)
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func (w ConditionalWait) checkCondition(obj *unstructured.Unstructured) (bool, error) {
 | 
					 | 
				
			||||||
	conditions, found, err := unstructured.NestedSlice(obj.Object, "status", "conditions")
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		return false, err
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	if !found {
 | 
					 | 
				
			||||||
		return false, nil
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	for _, conditionUncast := range conditions {
 | 
					 | 
				
			||||||
		condition := conditionUncast.(map[string]interface{})
 | 
					 | 
				
			||||||
		name, found, err := unstructured.NestedString(condition, "type")
 | 
					 | 
				
			||||||
		if !found || err != nil || !strings.EqualFold(name, w.conditionName) {
 | 
					 | 
				
			||||||
			continue
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		status, found, err := unstructured.NestedString(condition, "status")
 | 
					 | 
				
			||||||
		if !found || err != nil {
 | 
					 | 
				
			||||||
			continue
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		generation, found, _ := unstructured.NestedInt64(obj.Object, "metadata", "generation")
 | 
					 | 
				
			||||||
		if found {
 | 
					 | 
				
			||||||
			observedGeneration, found := getObservedGeneration(obj, condition)
 | 
					 | 
				
			||||||
			if found && observedGeneration < generation {
 | 
					 | 
				
			||||||
				return false, nil
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		return strings.EqualFold(status, w.conditionStatus), nil
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	return false, nil
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func (w ConditionalWait) isConditionMet(event watch.Event) (bool, error) {
 | 
					 | 
				
			||||||
	if event.Type == watch.Error {
 | 
					 | 
				
			||||||
		// keep waiting in the event we see an error - we expect the watch to be closed by
 | 
					 | 
				
			||||||
		// the server
 | 
					 | 
				
			||||||
		err := apierrors.FromObject(event.Object)
 | 
					 | 
				
			||||||
		fmt.Fprintf(w.errOut, "error: An error occurred while waiting for the condition to be satisfied: %v", err)
 | 
					 | 
				
			||||||
		return false, nil
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	if event.Type == watch.Deleted {
 | 
					 | 
				
			||||||
		// this will chain back out, result in another get and an return false back up the chain
 | 
					 | 
				
			||||||
		return false, nil
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	obj := event.Object.(*unstructured.Unstructured)
 | 
					 | 
				
			||||||
	return w.checkCondition(obj)
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func extendErrWaitTimeout(err error, info *resource.Info) error {
 | 
					 | 
				
			||||||
	return fmt.Errorf("%s on %s/%s", err.Error(), info.Mapping.Resource.Resource, info.Name)
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
func getObservedGeneration(obj *unstructured.Unstructured, condition map[string]interface{}) (int64, bool) {
 | 
					 | 
				
			||||||
	conditionObservedGeneration, found, _ := unstructured.NestedInt64(condition, "observedGeneration")
 | 
					 | 
				
			||||||
	if found {
 | 
					 | 
				
			||||||
		return conditionObservedGeneration, true
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	statusObservedGeneration, found, _ := unstructured.NestedInt64(obj.Object, "status", "observedGeneration")
 | 
					 | 
				
			||||||
	return statusObservedGeneration, found
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// JSONPathWait holds a JSONPath Parser which has the ability
 | 
					 | 
				
			||||||
// to check for the JSONPath condition and compare with the API server provided JSON output.
 | 
					 | 
				
			||||||
type JSONPathWait struct {
 | 
					 | 
				
			||||||
	matchAnyValue  bool
 | 
					 | 
				
			||||||
	jsonPathValue  string
 | 
					 | 
				
			||||||
	jsonPathParser *jsonpath.JSONPath
 | 
					 | 
				
			||||||
	// errOut is written to if an error occurs
 | 
					 | 
				
			||||||
	errOut io.Writer
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// IsJSONPathConditionMet fulfills the requirements of the interface ConditionFunc which provides condition check
 | 
					 | 
				
			||||||
func (j JSONPathWait) IsJSONPathConditionMet(ctx context.Context, info *resource.Info, o *WaitOptions) (runtime.Object, bool, error) {
 | 
					 | 
				
			||||||
	return getObjAndCheckCondition(ctx, info, o, j.isJSONPathConditionMet, j.checkCondition)
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// isJSONPathConditionMet is a helper function of IsJSONPathConditionMet
 | 
					 | 
				
			||||||
// which check the watch event and check if a JSONPathWait condition is met
 | 
					 | 
				
			||||||
func (j JSONPathWait) isJSONPathConditionMet(event watch.Event) (bool, error) {
 | 
					 | 
				
			||||||
	if event.Type == watch.Error {
 | 
					 | 
				
			||||||
		// keep waiting in the event we see an error - we expect the watch to be closed by
 | 
					 | 
				
			||||||
		// the server
 | 
					 | 
				
			||||||
		err := apierrors.FromObject(event.Object)
 | 
					 | 
				
			||||||
		fmt.Fprintf(j.errOut, "error: An error occurred while waiting for the condition to be satisfied: %v", err)
 | 
					 | 
				
			||||||
		return false, nil
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	if event.Type == watch.Deleted {
 | 
					 | 
				
			||||||
		// this will chain back out, result in another get and an return false back up the chain
 | 
					 | 
				
			||||||
		return false, nil
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	// event runtime Object can be safely asserted to Unstructed
 | 
					 | 
				
			||||||
	// because we are working with dynamic client
 | 
					 | 
				
			||||||
	obj := event.Object.(*unstructured.Unstructured)
 | 
					 | 
				
			||||||
	return j.checkCondition(obj)
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// checkCondition uses JSONPath parser to parse the JSON received from the API server
 | 
					 | 
				
			||||||
// and check if it matches the desired condition
 | 
					 | 
				
			||||||
func (j JSONPathWait) checkCondition(obj *unstructured.Unstructured) (bool, error) {
 | 
					 | 
				
			||||||
	queryObj := obj.UnstructuredContent()
 | 
					 | 
				
			||||||
	parseResults, err := j.jsonPathParser.FindResults(queryObj)
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		return false, err
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	if len(parseResults) == 0 || len(parseResults[0]) == 0 {
 | 
					 | 
				
			||||||
		return false, nil
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	if err := verifyParsedJSONPath(parseResults); err != nil {
 | 
					 | 
				
			||||||
		return false, err
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	if j.matchAnyValue {
 | 
					 | 
				
			||||||
		return true, nil
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	isConditionMet, err := compareResults(parseResults[0][0], j.jsonPathValue)
 | 
					 | 
				
			||||||
	if err != nil {
 | 
					 | 
				
			||||||
		return false, err
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	return isConditionMet, nil
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// verifyParsedJSONPath verifies the JSON received from the API server is valid.
 | 
					 | 
				
			||||||
// It will only accept a single JSON
 | 
					 | 
				
			||||||
func verifyParsedJSONPath(results [][]reflect.Value) error {
 | 
					 | 
				
			||||||
	if len(results) > 1 {
 | 
					 | 
				
			||||||
		return errors.New("given jsonpath expression matches more than one list")
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	if len(results[0]) > 1 {
 | 
					 | 
				
			||||||
		return errors.New("given jsonpath expression matches more than one value")
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	return nil
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// compareResults will compare the reflect.Value from the result parsed by the
 | 
					 | 
				
			||||||
// JSONPath parser with the expected value given by the value
 | 
					 | 
				
			||||||
//
 | 
					 | 
				
			||||||
// Since this is coming from an unstructured this can only ever be a primitive,
 | 
					 | 
				
			||||||
// map[string]interface{}, or []interface{}.
 | 
					 | 
				
			||||||
// We do not support the last two and rely on fmt to handle conversion to string
 | 
					 | 
				
			||||||
// and compare the result with user input
 | 
					 | 
				
			||||||
func compareResults(r reflect.Value, expectedVal string) (bool, error) {
 | 
					 | 
				
			||||||
	switch r.Interface().(type) {
 | 
					 | 
				
			||||||
	case map[string]interface{}, []interface{}:
 | 
					 | 
				
			||||||
		return false, errors.New("jsonpath leads to a nested object or list which is not supported")
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	s := fmt.Sprintf("%v", r.Interface())
 | 
					 | 
				
			||||||
	return strings.TrimSpace(s) == strings.TrimSpace(expectedVal), nil
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user