From b358b2df022d078bc3e8a17993b9f97a2765fd6b Mon Sep 17 00:00:00 2001 From: ymqytw Date: Mon, 28 Nov 2016 19:18:01 -0800 Subject: [PATCH] make drain retry forever and use new timeout --- pkg/kubectl/cmd/drain.go | 106 ++++++++++++++++++++++++++-------- pkg/kubectl/cmd/drain_test.go | 2 +- 2 files changed, 82 insertions(+), 26 deletions(-) diff --git a/pkg/kubectl/cmd/drain.go b/pkg/kubectl/cmd/drain.go index 92d076a0852..5bf9422c7e5 100644 --- a/pkg/kubectl/cmd/drain.go +++ b/pkg/kubectl/cmd/drain.go @@ -185,7 +185,7 @@ func NewCmdDrain(f cmdutil.Factory, out, errOut io.Writer) *cobra.Command { cmd.Flags().BoolVar(&options.IgnoreDaemonsets, "ignore-daemonsets", false, "Ignore DaemonSet-managed pods.") cmd.Flags().BoolVar(&options.DeleteLocalData, "delete-local-data", false, "Continue even if there are pods using emptyDir (local data that will be deleted when the node is drained).") cmd.Flags().IntVar(&options.GracePeriodSeconds, "grace-period", -1, "Period of time in seconds given to each pod to terminate gracefully. If negative, the default value specified in the pod will be used.") - cmd.Flags().DurationVar(&options.Timeout, "timeout", 0, "The length of time to wait before giving up on a delete, zero means determine a timeout from the size of the object") + cmd.Flags().DurationVar(&options.Timeout, "timeout", 0, "The length of time to wait before giving up, zero means infinite") return cmd } @@ -238,16 +238,6 @@ func (o *DrainOptions) RunDrain() error { } err := o.deleteOrEvictPodsSimple() - // TODO: update IsTooManyRequests() when the TooManyRequests(429) error returned from the API server has a non-empty Reason field - for i := 1; i <= maxPatchRetry && apierrors.IsTooManyRequests(err); i++ { - if i > triesBeforeBackOff { - currBackOffPeriod := time.Duration(math.Exp2(float64(i-triesBeforeBackOff))) * backOffPeriod - fmt.Fprintf(o.errOut, "Retry in %v\n", currBackOffPeriod) - o.backOff.Sleep(currBackOffPeriod) - } - fmt.Fprintf(o.errOut, "Retrying\n") - err = o.deleteOrEvictPodsSimple() - } if err == nil { cmdutil.PrintSuccess(o.mapper, false, o.out, "node", o.nodeInfo.Name, false, "drained") } @@ -259,9 +249,7 @@ func (o *DrainOptions) deleteOrEvictPodsSimple() error { if err != nil { return err } - if o.Timeout == 0 { - o.Timeout = kubectl.Timeout + time.Duration(10*len(pods))*time.Second - } + err = o.deleteOrEvictPods(pods) if err != nil { pendingPods, newErr := o.getPodsForDeletion() @@ -470,31 +458,99 @@ func (o *DrainOptions) deleteOrEvictPods(pods []api.Pod) error { return err } + getPodFn := func(namespace, name string) (*api.Pod, error) { + return o.client.Core().Pods(namespace).Get(name) + } + + if len(policyGroupVersion) > 0 { + return o.evictPods(pods, policyGroupVersion, getPodFn) + } else { + return o.deletePods(pods, getPodFn) + } +} + +func (o *DrainOptions) evictPods(pods []api.Pod, policyGroupVersion string, getPodFn func(namespace, name string) (*api.Pod, error)) error { + doneCh := make(chan bool, len(pods)) + errCh := make(chan error, 1) + for _, pod := range pods { - if len(policyGroupVersion) > 0 { - err = o.evictPod(pod, policyGroupVersion) - } else { - err = o.deletePod(pod) + go func(pod api.Pod, doneCh chan bool, errCh chan error) { + var err error + for { + err = o.evictPod(pod, policyGroupVersion) + if err == nil { + break + } else if apierrors.IsTooManyRequests(err) { + time.Sleep(5 * time.Second) + } else { + errCh <- fmt.Errorf("error when evicting pod %q: %v", pod.Name, err) + return + } + } + podArray := []api.Pod{pod} + _, err = o.waitForDelete(podArray, kubectl.Interval, time.Duration(math.MaxInt64), true, getPodFn) + if err == nil { + doneCh <- true + } else { + errCh <- fmt.Errorf("error when waiting for pod %q terminating: %v", pod.Name, err) + } + }(pod, doneCh, errCh) + } + + doneCount := 0 + // 0 timeout means infinite, we use MaxInt64 to represent it. + var globalTimeout time.Duration + if o.Timeout == 0 { + globalTimeout = time.Duration(math.MaxInt64) + } else { + globalTimeout = o.Timeout + } + for { + select { + case err := <-errCh: + return err + case <-doneCh: + doneCount++ + if doneCount == len(pods) { + return nil + } + case <-time.After(globalTimeout): + return fmt.Errorf("Drain did not complete within %v", globalTimeout) } + } +} + +func (o *DrainOptions) deletePods(pods []api.Pod, getPodFn func(namespace, name string) (*api.Pod, error)) error { + // 0 timeout means infinite, we use MaxInt64 to represent it. + var globalTimeout time.Duration + if o.Timeout == 0 { + globalTimeout = time.Duration(math.MaxInt64) + } else { + globalTimeout = o.Timeout + } + for _, pod := range pods { + err := o.deletePod(pod) if err != nil { return err } } - - getPodFn := func(namespace, name string) (*api.Pod, error) { - return o.client.Core().Pods(namespace).Get(name) - } - _, err = o.waitForDelete(pods, kubectl.Interval, o.Timeout, getPodFn) + _, err := o.waitForDelete(pods, kubectl.Interval, globalTimeout, false, getPodFn) return err } -func (o *DrainOptions) waitForDelete(pods []api.Pod, interval, timeout time.Duration, getPodFn func(string, string) (*api.Pod, error)) ([]api.Pod, error) { +func (o *DrainOptions) waitForDelete(pods []api.Pod, interval, timeout time.Duration, usingEviction bool, getPodFn func(string, string) (*api.Pod, error)) ([]api.Pod, error) { + var verbStr string + if usingEviction { + verbStr = "evicted" + } else { + verbStr = "deleted" + } err := wait.PollImmediate(interval, timeout, func() (bool, error) { pendingPods := []api.Pod{} for i, pod := range pods { p, err := getPodFn(pod.Namespace, pod.Name) if apierrors.IsNotFound(err) || (p != nil && p.ObjectMeta.UID != pod.ObjectMeta.UID) { - cmdutil.PrintSuccess(o.mapper, false, o.out, "pod", pod.Name, false, "deleted") + cmdutil.PrintSuccess(o.mapper, false, o.out, "pod", pod.Name, false, verbStr) continue } else if err != nil { return false, err diff --git a/pkg/kubectl/cmd/drain_test.go b/pkg/kubectl/cmd/drain_test.go index ce58c3d151a..edd4da5e397 100644 --- a/pkg/kubectl/cmd/drain_test.go +++ b/pkg/kubectl/cmd/drain_test.go @@ -684,7 +684,7 @@ func TestDeletePods(t *testing.T) { o.mapper, _ = f.Object() o.out = os.Stdout _, pods := createPods(false) - pendingPods, err := o.waitForDelete(pods, test.interval, test.timeout, test.getPodFn) + pendingPods, err := o.waitForDelete(pods, test.interval, test.timeout, false, test.getPodFn) if test.expectError { if err == nil {