
Automatic merge from submit-queue (batch tested with PRs 42162, 41973, 42015, 42115, 41923) kubectl drain: make code reusable DrainOptions requires a few fields to be set, and the expectation is that these are set as part of construction of the object. If they are set, then the drain code can be reused in other kubernetes projects. This does not create a contract that DrainOptions should fulfill going forwards, any more than any of the other types that happen to be exposed are part of the contract. Instead, this merely makes use outside the package possible. ```release-note NONE ```
673 lines
21 KiB
Go
673 lines
21 KiB
Go
/*
|
|
Copyright 2015 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package cmd
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"math"
|
|
"reflect"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/jonboulle/clockwork"
|
|
"github.com/spf13/cobra"
|
|
|
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
|
"k8s.io/apimachinery/pkg/api/meta"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/fields"
|
|
"k8s.io/apimachinery/pkg/runtime"
|
|
"k8s.io/apimachinery/pkg/util/wait"
|
|
restclient "k8s.io/client-go/rest"
|
|
"k8s.io/kubernetes/pkg/api"
|
|
"k8s.io/kubernetes/pkg/apis/policy"
|
|
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
|
"k8s.io/kubernetes/pkg/kubectl"
|
|
"k8s.io/kubernetes/pkg/kubectl/cmd/templates"
|
|
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
|
|
"k8s.io/kubernetes/pkg/kubectl/resource"
|
|
"k8s.io/kubernetes/pkg/kubelet/types"
|
|
"k8s.io/kubernetes/pkg/util/i18n"
|
|
)
|
|
|
|
type DrainOptions struct {
|
|
client internalclientset.Interface
|
|
restClient *restclient.RESTClient
|
|
Factory cmdutil.Factory
|
|
Force bool
|
|
GracePeriodSeconds int
|
|
IgnoreDaemonsets bool
|
|
Timeout time.Duration
|
|
backOff clockwork.Clock
|
|
DeleteLocalData bool
|
|
mapper meta.RESTMapper
|
|
nodeInfo *resource.Info
|
|
Out io.Writer
|
|
ErrOut io.Writer
|
|
typer runtime.ObjectTyper
|
|
}
|
|
|
|
// Takes a pod and returns a bool indicating whether or not to operate on the
|
|
// pod, an optional warning message, and an optional fatal error.
|
|
type podFilter func(api.Pod) (include bool, w *warning, f *fatal)
|
|
type warning struct {
|
|
string
|
|
}
|
|
type fatal struct {
|
|
string
|
|
}
|
|
|
|
const (
|
|
EvictionKind = "Eviction"
|
|
EvictionSubresource = "pods/eviction"
|
|
|
|
kDaemonsetFatal = "DaemonSet-managed pods (use --ignore-daemonsets to ignore)"
|
|
kDaemonsetWarning = "Ignoring DaemonSet-managed pods"
|
|
kLocalStorageFatal = "pods with local storage (use --delete-local-data to override)"
|
|
kLocalStorageWarning = "Deleting pods with local storage"
|
|
kUnmanagedFatal = "pods not managed by ReplicationController, ReplicaSet, Job, DaemonSet or StatefulSet (use --force to override)"
|
|
kUnmanagedWarning = "Deleting pods not managed by ReplicationController, ReplicaSet, Job, DaemonSet or StatefulSet"
|
|
kMaxNodeUpdateRetry = 10
|
|
)
|
|
|
|
var (
|
|
cordon_long = templates.LongDesc(`
|
|
Mark node as unschedulable.`)
|
|
|
|
cordon_example = templates.Examples(`
|
|
# Mark node "foo" as unschedulable.
|
|
kubectl cordon foo`)
|
|
)
|
|
|
|
func NewCmdCordon(f cmdutil.Factory, out io.Writer) *cobra.Command {
|
|
options := &DrainOptions{Factory: f, Out: out}
|
|
|
|
cmd := &cobra.Command{
|
|
Use: "cordon NODE",
|
|
Short: i18n.T("Mark node as unschedulable"),
|
|
Long: cordon_long,
|
|
Example: cordon_example,
|
|
Run: func(cmd *cobra.Command, args []string) {
|
|
cmdutil.CheckErr(options.SetupDrain(cmd, args))
|
|
cmdutil.CheckErr(options.RunCordonOrUncordon(true))
|
|
},
|
|
}
|
|
return cmd
|
|
}
|
|
|
|
var (
|
|
uncordon_long = templates.LongDesc(`
|
|
Mark node as schedulable.`)
|
|
|
|
uncordon_example = templates.Examples(`
|
|
# Mark node "foo" as schedulable.
|
|
$ kubectl uncordon foo`)
|
|
)
|
|
|
|
func NewCmdUncordon(f cmdutil.Factory, out io.Writer) *cobra.Command {
|
|
options := &DrainOptions{Factory: f, Out: out}
|
|
|
|
cmd := &cobra.Command{
|
|
Use: "uncordon NODE",
|
|
Short: i18n.T("Mark node as schedulable"),
|
|
Long: uncordon_long,
|
|
Example: uncordon_example,
|
|
Run: func(cmd *cobra.Command, args []string) {
|
|
cmdutil.CheckErr(options.SetupDrain(cmd, args))
|
|
cmdutil.CheckErr(options.RunCordonOrUncordon(false))
|
|
},
|
|
}
|
|
return cmd
|
|
}
|
|
|
|
var (
|
|
drain_long = templates.LongDesc(`
|
|
Drain node in preparation for maintenance.
|
|
|
|
The given node will be marked unschedulable to prevent new pods from arriving.
|
|
'drain' evicts the pods if the APIServer supports eviction
|
|
(http://kubernetes.io/docs/admin/disruptions/). Otherwise, it will use normal DELETE
|
|
to delete the pods.
|
|
The 'drain' evicts or deletes all pods except mirror pods (which cannot be deleted through
|
|
the API server). If there are DaemonSet-managed pods, drain will not proceed
|
|
without --ignore-daemonsets, and regardless it will not delete any
|
|
DaemonSet-managed pods, because those pods would be immediately replaced by the
|
|
DaemonSet controller, which ignores unschedulable markings. If there are any
|
|
pods that are neither mirror pods nor managed by ReplicationController,
|
|
ReplicaSet, DaemonSet, StatefulSet or Job, then drain will not delete any pods unless you
|
|
use --force. --force will also allow deletion to proceed if the managing resource of one
|
|
or more pods is missing.
|
|
|
|
'drain' waits for graceful termination. You should not operate on the machine until
|
|
the command completes.
|
|
|
|
When you are ready to put the node back into service, use kubectl uncordon, which
|
|
will make the node schedulable again.
|
|
|
|
`)
|
|
|
|
drain_example = templates.Examples(`
|
|
# Drain node "foo", even if there are pods not managed by a ReplicationController, ReplicaSet, Job, DaemonSet or StatefulSet on it.
|
|
$ kubectl drain foo --force
|
|
|
|
# As above, but abort if there are pods not managed by a ReplicationController, ReplicaSet, Job, DaemonSet or StatefulSet, and use a grace period of 15 minutes.
|
|
$ kubectl drain foo --grace-period=900`)
|
|
)
|
|
|
|
func NewCmdDrain(f cmdutil.Factory, out, errOut io.Writer) *cobra.Command {
|
|
options := &DrainOptions{Factory: f, Out: out, ErrOut: errOut, backOff: clockwork.NewRealClock()}
|
|
|
|
cmd := &cobra.Command{
|
|
Use: "drain NODE",
|
|
Short: i18n.T("Drain node in preparation for maintenance"),
|
|
Long: drain_long,
|
|
Example: drain_example,
|
|
Run: func(cmd *cobra.Command, args []string) {
|
|
cmdutil.CheckErr(options.SetupDrain(cmd, args))
|
|
cmdutil.CheckErr(options.RunDrain())
|
|
},
|
|
}
|
|
cmd.Flags().BoolVar(&options.Force, "force", false, "Continue even if there are pods not managed by a ReplicationController, ReplicaSet, Job, DaemonSet or StatefulSet.")
|
|
cmd.Flags().BoolVar(&options.IgnoreDaemonsets, "ignore-daemonsets", false, "Ignore DaemonSet-managed pods.")
|
|
cmd.Flags().BoolVar(&options.DeleteLocalData, "delete-local-data", false, "Continue even if there are pods using emptyDir (local data that will be deleted when the node is drained).")
|
|
cmd.Flags().IntVar(&options.GracePeriodSeconds, "grace-period", -1, "Period of time in seconds given to each pod to terminate gracefully. If negative, the default value specified in the pod will be used.")
|
|
cmd.Flags().DurationVar(&options.Timeout, "timeout", 0, "The length of time to wait before giving up, zero means infinite")
|
|
return cmd
|
|
}
|
|
|
|
// SetupDrain populates some fields from the factory, grabs command line
|
|
// arguments and looks up the node using Builder
|
|
func (o *DrainOptions) SetupDrain(cmd *cobra.Command, args []string) error {
|
|
var err error
|
|
if len(args) != 1 {
|
|
return cmdutil.UsageError(cmd, fmt.Sprintf("USAGE: %s [flags]", cmd.Use))
|
|
}
|
|
|
|
if o.client, err = o.Factory.ClientSet(); err != nil {
|
|
return err
|
|
}
|
|
|
|
o.restClient, err = o.Factory.RESTClient()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
o.mapper, o.typer = o.Factory.Object()
|
|
|
|
cmdNamespace, _, err := o.Factory.DefaultNamespace()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
r := o.Factory.NewBuilder().
|
|
NamespaceParam(cmdNamespace).DefaultNamespace().
|
|
ResourceNames("node", args[0]).
|
|
Do()
|
|
|
|
if err = r.Err(); err != nil {
|
|
return err
|
|
}
|
|
|
|
return r.Visit(func(info *resource.Info, err error) error {
|
|
if err != nil {
|
|
return err
|
|
}
|
|
o.nodeInfo = info
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// RunDrain runs the 'drain' command
|
|
func (o *DrainOptions) RunDrain() error {
|
|
if err := o.RunCordonOrUncordon(true); err != nil {
|
|
return err
|
|
}
|
|
|
|
err := o.deleteOrEvictPodsSimple()
|
|
if err == nil {
|
|
cmdutil.PrintSuccess(o.mapper, false, o.Out, "node", o.nodeInfo.Name, false, "drained")
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (o *DrainOptions) deleteOrEvictPodsSimple() error {
|
|
pods, err := o.getPodsForDeletion()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = o.deleteOrEvictPods(pods)
|
|
if err != nil {
|
|
pendingPods, newErr := o.getPodsForDeletion()
|
|
if newErr != nil {
|
|
return newErr
|
|
}
|
|
fmt.Fprintf(o.ErrOut, "There are pending pods when an error occurred: %v\n", err)
|
|
for _, pendingPod := range pendingPods {
|
|
fmt.Fprintf(o.ErrOut, "%s/%s\n", "pod", pendingPod.Name)
|
|
}
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (o *DrainOptions) getController(sr *api.SerializedReference) (interface{}, error) {
|
|
switch sr.Reference.Kind {
|
|
case "ReplicationController":
|
|
return o.client.Core().ReplicationControllers(sr.Reference.Namespace).Get(sr.Reference.Name, metav1.GetOptions{})
|
|
case "DaemonSet":
|
|
return o.client.Extensions().DaemonSets(sr.Reference.Namespace).Get(sr.Reference.Name, metav1.GetOptions{})
|
|
case "Job":
|
|
return o.client.Batch().Jobs(sr.Reference.Namespace).Get(sr.Reference.Name, metav1.GetOptions{})
|
|
case "ReplicaSet":
|
|
return o.client.Extensions().ReplicaSets(sr.Reference.Namespace).Get(sr.Reference.Name, metav1.GetOptions{})
|
|
case "StatefulSet":
|
|
return o.client.Apps().StatefulSets(sr.Reference.Namespace).Get(sr.Reference.Name, metav1.GetOptions{})
|
|
}
|
|
return nil, fmt.Errorf("Unknown controller kind %q", sr.Reference.Kind)
|
|
}
|
|
|
|
func (o *DrainOptions) getPodCreator(pod api.Pod) (*api.SerializedReference, error) {
|
|
creatorRef, found := pod.ObjectMeta.Annotations[api.CreatedByAnnotation]
|
|
if !found {
|
|
return nil, nil
|
|
}
|
|
// Now verify that the specified creator actually exists.
|
|
sr := &api.SerializedReference{}
|
|
if err := runtime.DecodeInto(o.Factory.Decoder(true), []byte(creatorRef), sr); err != nil {
|
|
return nil, err
|
|
}
|
|
// We assume the only reason for an error is because the controller is
|
|
// gone/missing, not for any other cause. TODO(mml): something more
|
|
// sophisticated than this
|
|
_, err := o.getController(sr)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return sr, nil
|
|
}
|
|
|
|
func (o *DrainOptions) unreplicatedFilter(pod api.Pod) (bool, *warning, *fatal) {
|
|
// any finished pod can be removed
|
|
if pod.Status.Phase == api.PodSucceeded || pod.Status.Phase == api.PodFailed {
|
|
return true, nil, nil
|
|
}
|
|
|
|
sr, err := o.getPodCreator(pod)
|
|
if err != nil {
|
|
// if we're forcing, remove orphaned pods with a warning
|
|
if apierrors.IsNotFound(err) && o.Force {
|
|
return true, &warning{err.Error()}, nil
|
|
}
|
|
return false, nil, &fatal{err.Error()}
|
|
}
|
|
if sr != nil {
|
|
return true, nil, nil
|
|
}
|
|
if !o.Force {
|
|
return false, nil, &fatal{kUnmanagedFatal}
|
|
}
|
|
return true, &warning{kUnmanagedWarning}, nil
|
|
}
|
|
|
|
func (o *DrainOptions) daemonsetFilter(pod api.Pod) (bool, *warning, *fatal) {
|
|
// Note that we return false in cases where the pod is DaemonSet managed,
|
|
// regardless of flags. We never delete them, the only question is whether
|
|
// their presence constitutes an error.
|
|
//
|
|
// The exception is for pods that are orphaned (the referencing
|
|
// management resource - including DaemonSet - is not found).
|
|
// Such pods will be deleted if --force is used.
|
|
sr, err := o.getPodCreator(pod)
|
|
if err != nil {
|
|
// if we're forcing, remove orphaned pods with a warning
|
|
if apierrors.IsNotFound(err) && o.Force {
|
|
return true, &warning{err.Error()}, nil
|
|
}
|
|
return false, nil, &fatal{err.Error()}
|
|
}
|
|
if sr == nil || sr.Reference.Kind != "DaemonSet" {
|
|
return true, nil, nil
|
|
}
|
|
if _, err := o.client.Extensions().DaemonSets(sr.Reference.Namespace).Get(sr.Reference.Name, metav1.GetOptions{}); err != nil {
|
|
return false, nil, &fatal{err.Error()}
|
|
}
|
|
if !o.IgnoreDaemonsets {
|
|
return false, nil, &fatal{kDaemonsetFatal}
|
|
}
|
|
return false, &warning{kDaemonsetWarning}, nil
|
|
}
|
|
|
|
func mirrorPodFilter(pod api.Pod) (bool, *warning, *fatal) {
|
|
if _, found := pod.ObjectMeta.Annotations[types.ConfigMirrorAnnotationKey]; found {
|
|
return false, nil, nil
|
|
}
|
|
return true, nil, nil
|
|
}
|
|
|
|
func hasLocalStorage(pod api.Pod) bool {
|
|
for _, volume := range pod.Spec.Volumes {
|
|
if volume.EmptyDir != nil {
|
|
return true
|
|
}
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
func (o *DrainOptions) localStorageFilter(pod api.Pod) (bool, *warning, *fatal) {
|
|
if !hasLocalStorage(pod) {
|
|
return true, nil, nil
|
|
}
|
|
if !o.DeleteLocalData {
|
|
return false, nil, &fatal{kLocalStorageFatal}
|
|
}
|
|
return true, &warning{kLocalStorageWarning}, nil
|
|
}
|
|
|
|
// Map of status message to a list of pod names having that status.
|
|
type podStatuses map[string][]string
|
|
|
|
func (ps podStatuses) Message() string {
|
|
msgs := []string{}
|
|
|
|
for key, pods := range ps {
|
|
msgs = append(msgs, fmt.Sprintf("%s: %s", key, strings.Join(pods, ", ")))
|
|
}
|
|
return strings.Join(msgs, "; ")
|
|
}
|
|
|
|
// getPodsForDeletion returns all the pods we're going to delete. If there are
|
|
// any pods preventing us from deleting, we return that list in an error.
|
|
func (o *DrainOptions) getPodsForDeletion() (pods []api.Pod, err error) {
|
|
podList, err := o.client.Core().Pods(metav1.NamespaceAll).List(metav1.ListOptions{
|
|
FieldSelector: fields.SelectorFromSet(fields.Set{"spec.nodeName": o.nodeInfo.Name}).String()})
|
|
if err != nil {
|
|
return pods, err
|
|
}
|
|
|
|
ws := podStatuses{}
|
|
fs := podStatuses{}
|
|
|
|
for _, pod := range podList.Items {
|
|
podOk := true
|
|
for _, filt := range []podFilter{mirrorPodFilter, o.localStorageFilter, o.unreplicatedFilter, o.daemonsetFilter} {
|
|
filterOk, w, f := filt(pod)
|
|
|
|
podOk = podOk && filterOk
|
|
if w != nil {
|
|
ws[w.string] = append(ws[w.string], pod.Name)
|
|
}
|
|
if f != nil {
|
|
fs[f.string] = append(fs[f.string], pod.Name)
|
|
}
|
|
}
|
|
if podOk {
|
|
pods = append(pods, pod)
|
|
}
|
|
}
|
|
|
|
if len(fs) > 0 {
|
|
return []api.Pod{}, errors.New(fs.Message())
|
|
}
|
|
if len(ws) > 0 {
|
|
fmt.Fprintf(o.ErrOut, "WARNING: %s\n", ws.Message())
|
|
}
|
|
return pods, nil
|
|
}
|
|
|
|
func (o *DrainOptions) deletePod(pod api.Pod) error {
|
|
deleteOptions := &metav1.DeleteOptions{}
|
|
if o.GracePeriodSeconds >= 0 {
|
|
gracePeriodSeconds := int64(o.GracePeriodSeconds)
|
|
deleteOptions.GracePeriodSeconds = &gracePeriodSeconds
|
|
}
|
|
return o.client.Core().Pods(pod.Namespace).Delete(pod.Name, deleteOptions)
|
|
}
|
|
|
|
func (o *DrainOptions) evictPod(pod api.Pod, policyGroupVersion string) error {
|
|
deleteOptions := &metav1.DeleteOptions{}
|
|
if o.GracePeriodSeconds >= 0 {
|
|
gracePeriodSeconds := int64(o.GracePeriodSeconds)
|
|
deleteOptions.GracePeriodSeconds = &gracePeriodSeconds
|
|
}
|
|
eviction := &policy.Eviction{
|
|
TypeMeta: metav1.TypeMeta{
|
|
APIVersion: policyGroupVersion,
|
|
Kind: EvictionKind,
|
|
},
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: pod.Name,
|
|
Namespace: pod.Namespace,
|
|
},
|
|
DeleteOptions: deleteOptions,
|
|
}
|
|
// Remember to change change the URL manipulation func when Evction's version change
|
|
return o.client.Policy().Evictions(eviction.Namespace).Evict(eviction)
|
|
}
|
|
|
|
// deleteOrEvictPods deletes or evicts the pods on the api server
|
|
func (o *DrainOptions) deleteOrEvictPods(pods []api.Pod) error {
|
|
if len(pods) == 0 {
|
|
return nil
|
|
}
|
|
|
|
policyGroupVersion, err := SupportEviction(o.client)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
getPodFn := func(namespace, name string) (*api.Pod, error) {
|
|
return o.client.Core().Pods(namespace).Get(name, metav1.GetOptions{})
|
|
}
|
|
|
|
if len(policyGroupVersion) > 0 {
|
|
return o.evictPods(pods, policyGroupVersion, getPodFn)
|
|
} else {
|
|
return o.deletePods(pods, getPodFn)
|
|
}
|
|
}
|
|
|
|
func (o *DrainOptions) evictPods(pods []api.Pod, policyGroupVersion string, getPodFn func(namespace, name string) (*api.Pod, error)) error {
|
|
doneCh := make(chan bool, len(pods))
|
|
errCh := make(chan error, 1)
|
|
|
|
for _, pod := range pods {
|
|
go func(pod api.Pod, doneCh chan bool, errCh chan error) {
|
|
var err error
|
|
for {
|
|
err = o.evictPod(pod, policyGroupVersion)
|
|
if err == nil {
|
|
break
|
|
} else if apierrors.IsTooManyRequests(err) {
|
|
time.Sleep(5 * time.Second)
|
|
} else {
|
|
errCh <- fmt.Errorf("error when evicting pod %q: %v", pod.Name, err)
|
|
return
|
|
}
|
|
}
|
|
podArray := []api.Pod{pod}
|
|
_, err = o.waitForDelete(podArray, kubectl.Interval, time.Duration(math.MaxInt64), true, getPodFn)
|
|
if err == nil {
|
|
doneCh <- true
|
|
} else {
|
|
errCh <- fmt.Errorf("error when waiting for pod %q terminating: %v", pod.Name, err)
|
|
}
|
|
}(pod, doneCh, errCh)
|
|
}
|
|
|
|
doneCount := 0
|
|
// 0 timeout means infinite, we use MaxInt64 to represent it.
|
|
var globalTimeout time.Duration
|
|
if o.Timeout == 0 {
|
|
globalTimeout = time.Duration(math.MaxInt64)
|
|
} else {
|
|
globalTimeout = o.Timeout
|
|
}
|
|
for {
|
|
select {
|
|
case err := <-errCh:
|
|
return err
|
|
case <-doneCh:
|
|
doneCount++
|
|
if doneCount == len(pods) {
|
|
return nil
|
|
}
|
|
case <-time.After(globalTimeout):
|
|
return fmt.Errorf("Drain did not complete within %v", globalTimeout)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (o *DrainOptions) deletePods(pods []api.Pod, getPodFn func(namespace, name string) (*api.Pod, error)) error {
|
|
// 0 timeout means infinite, we use MaxInt64 to represent it.
|
|
var globalTimeout time.Duration
|
|
if o.Timeout == 0 {
|
|
globalTimeout = time.Duration(math.MaxInt64)
|
|
} else {
|
|
globalTimeout = o.Timeout
|
|
}
|
|
for _, pod := range pods {
|
|
err := o.deletePod(pod)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
_, err := o.waitForDelete(pods, kubectl.Interval, globalTimeout, false, getPodFn)
|
|
return err
|
|
}
|
|
|
|
func (o *DrainOptions) waitForDelete(pods []api.Pod, interval, timeout time.Duration, usingEviction bool, getPodFn func(string, string) (*api.Pod, error)) ([]api.Pod, error) {
|
|
var verbStr string
|
|
if usingEviction {
|
|
verbStr = "evicted"
|
|
} else {
|
|
verbStr = "deleted"
|
|
}
|
|
err := wait.PollImmediate(interval, timeout, func() (bool, error) {
|
|
pendingPods := []api.Pod{}
|
|
for i, pod := range pods {
|
|
p, err := getPodFn(pod.Namespace, pod.Name)
|
|
if apierrors.IsNotFound(err) || (p != nil && p.ObjectMeta.UID != pod.ObjectMeta.UID) {
|
|
cmdutil.PrintSuccess(o.mapper, false, o.Out, "pod", pod.Name, false, verbStr)
|
|
continue
|
|
} else if err != nil {
|
|
return false, err
|
|
} else {
|
|
pendingPods = append(pendingPods, pods[i])
|
|
}
|
|
}
|
|
pods = pendingPods
|
|
if len(pendingPods) > 0 {
|
|
return false, nil
|
|
}
|
|
return true, nil
|
|
})
|
|
return pods, err
|
|
}
|
|
|
|
// SupportEviction uses Discovery API to find out if the server support eviction subresource
|
|
// If support, it will return its groupVersion; Otherwise, it will return ""
|
|
func SupportEviction(clientset internalclientset.Interface) (string, error) {
|
|
discoveryClient := clientset.Discovery()
|
|
groupList, err := discoveryClient.ServerGroups()
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
foundPolicyGroup := false
|
|
var policyGroupVersion string
|
|
for _, group := range groupList.Groups {
|
|
if group.Name == "policy" {
|
|
foundPolicyGroup = true
|
|
policyGroupVersion = group.PreferredVersion.GroupVersion
|
|
break
|
|
}
|
|
}
|
|
if !foundPolicyGroup {
|
|
return "", nil
|
|
}
|
|
resourceList, err := discoveryClient.ServerResourcesForGroupVersion("v1")
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
for _, resource := range resourceList.APIResources {
|
|
if resource.Name == EvictionSubresource && resource.Kind == EvictionKind {
|
|
return policyGroupVersion, nil
|
|
}
|
|
}
|
|
return "", nil
|
|
}
|
|
|
|
// RunCordonOrUncordon runs either Cordon or Uncordon. The desired value for
|
|
// "Unschedulable" is passed as the first arg.
|
|
func (o *DrainOptions) RunCordonOrUncordon(desired bool) error {
|
|
cmdNamespace, _, err := o.Factory.DefaultNamespace()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if o.nodeInfo.Mapping.GroupVersionKind.Kind == "Node" {
|
|
unsched := reflect.ValueOf(o.nodeInfo.Object).Elem().FieldByName("Spec").FieldByName("Unschedulable")
|
|
if unsched.Bool() == desired {
|
|
cmdutil.PrintSuccess(o.mapper, false, o.Out, o.nodeInfo.Mapping.Resource, o.nodeInfo.Name, false, already(desired))
|
|
} else {
|
|
helper := resource.NewHelper(o.restClient, o.nodeInfo.Mapping)
|
|
unsched.SetBool(desired)
|
|
var err error
|
|
for i := 0; i < kMaxNodeUpdateRetry; i++ {
|
|
// We don't care about what previous versions may exist, we always want
|
|
// to overwrite, and Replace always sets current ResourceVersion if version is "".
|
|
helper.Versioner.SetResourceVersion(o.nodeInfo.Object, "")
|
|
_, err = helper.Replace(cmdNamespace, o.nodeInfo.Name, true, o.nodeInfo.Object)
|
|
if err != nil {
|
|
if !apierrors.IsConflict(err) {
|
|
return err
|
|
}
|
|
} else {
|
|
break
|
|
}
|
|
// It's a race, no need to sleep
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
cmdutil.PrintSuccess(o.mapper, false, o.Out, o.nodeInfo.Mapping.Resource, o.nodeInfo.Name, false, changed(desired))
|
|
}
|
|
} else {
|
|
cmdutil.PrintSuccess(o.mapper, false, o.Out, o.nodeInfo.Mapping.Resource, o.nodeInfo.Name, false, "skipped")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// already() and changed() return suitable strings for {un,}cordoning
|
|
|
|
func already(desired bool) string {
|
|
if desired {
|
|
return "already cordoned"
|
|
}
|
|
return "already uncordoned"
|
|
}
|
|
|
|
func changed(desired bool) string {
|
|
if desired {
|
|
return "cordoned"
|
|
}
|
|
return "uncordoned"
|
|
}
|