
all remaining scalers were replaced by GenericScaler exept JobScaler. It is not clear whether JobScaler could use generic scaler or not. For more details see the pull request.
488 lines
18 KiB
Go
488 lines
18 KiB
Go
/*
|
|
Copyright 2014 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package kubectl
|
|
|
|
import (
|
|
"fmt"
|
|
"strings"
|
|
"time"
|
|
|
|
"k8s.io/apimachinery/pkg/api/errors"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/labels"
|
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
|
utilerrors "k8s.io/apimachinery/pkg/util/errors"
|
|
"k8s.io/apimachinery/pkg/util/uuid"
|
|
"k8s.io/apimachinery/pkg/util/wait"
|
|
"k8s.io/kubernetes/pkg/apis/apps"
|
|
"k8s.io/kubernetes/pkg/apis/batch"
|
|
api "k8s.io/kubernetes/pkg/apis/core"
|
|
"k8s.io/kubernetes/pkg/apis/extensions"
|
|
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
|
appsclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/apps/internalversion"
|
|
batchclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/batch/internalversion"
|
|
coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion"
|
|
extensionsclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/extensions/internalversion"
|
|
deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util"
|
|
)
|
|
|
|
const (
|
|
Interval = time.Second * 1
|
|
Timeout = time.Minute * 5
|
|
)
|
|
|
|
// A Reaper terminates an object as gracefully as possible.
|
|
type Reaper interface {
|
|
// Stop a given object within a namespace. timeout is how long we'll
|
|
// wait for the termination to be successful. gracePeriod is time given
|
|
// to an API object for it to delete itself cleanly (e.g., pod
|
|
// shutdown). It may or may not be supported by the API object.
|
|
Stop(namespace, name string, timeout time.Duration, gracePeriod *metav1.DeleteOptions) error
|
|
}
|
|
|
|
type NoSuchReaperError struct {
|
|
kind schema.GroupKind
|
|
}
|
|
|
|
func (n *NoSuchReaperError) Error() string {
|
|
return fmt.Sprintf("no reaper has been implemented for %v", n.kind)
|
|
}
|
|
|
|
func IsNoSuchReaperError(err error) bool {
|
|
_, ok := err.(*NoSuchReaperError)
|
|
return ok
|
|
}
|
|
|
|
func ReaperFor(kind schema.GroupKind, c internalclientset.Interface) (Reaper, error) {
|
|
switch kind {
|
|
case api.Kind("ReplicationController"):
|
|
return &ReplicationControllerReaper{c.Core(), Interval, Timeout}, nil
|
|
|
|
case extensions.Kind("ReplicaSet"), apps.Kind("ReplicaSet"):
|
|
return &ReplicaSetReaper{c.Extensions(), Interval, Timeout}, nil
|
|
|
|
case extensions.Kind("DaemonSet"), apps.Kind("DaemonSet"):
|
|
return &DaemonSetReaper{c.Extensions(), Interval, Timeout}, nil
|
|
|
|
case api.Kind("Pod"):
|
|
return &PodReaper{c.Core()}, nil
|
|
|
|
case batch.Kind("Job"):
|
|
return &JobReaper{c.Batch(), c.Core(), Interval, Timeout}, nil
|
|
|
|
case apps.Kind("StatefulSet"):
|
|
return &StatefulSetReaper{c.Apps(), c.Core(), Interval, Timeout}, nil
|
|
|
|
case extensions.Kind("Deployment"), apps.Kind("Deployment"):
|
|
return &DeploymentReaper{c.Extensions(), c.Extensions(), Interval, Timeout}, nil
|
|
|
|
}
|
|
return nil, &NoSuchReaperError{kind}
|
|
}
|
|
|
|
func ReaperForReplicationController(rcClient coreclient.ReplicationControllersGetter, timeout time.Duration) (Reaper, error) {
|
|
return &ReplicationControllerReaper{rcClient, Interval, timeout}, nil
|
|
}
|
|
|
|
type ReplicationControllerReaper struct {
|
|
client coreclient.ReplicationControllersGetter
|
|
pollInterval, timeout time.Duration
|
|
}
|
|
type ReplicaSetReaper struct {
|
|
client extensionsclient.ReplicaSetsGetter
|
|
pollInterval, timeout time.Duration
|
|
}
|
|
type DaemonSetReaper struct {
|
|
client extensionsclient.DaemonSetsGetter
|
|
pollInterval, timeout time.Duration
|
|
}
|
|
type JobReaper struct {
|
|
client batchclient.JobsGetter
|
|
podClient coreclient.PodsGetter
|
|
pollInterval, timeout time.Duration
|
|
}
|
|
type DeploymentReaper struct {
|
|
dClient extensionsclient.DeploymentsGetter
|
|
rsClient extensionsclient.ReplicaSetsGetter
|
|
pollInterval, timeout time.Duration
|
|
}
|
|
type PodReaper struct {
|
|
client coreclient.PodsGetter
|
|
}
|
|
type StatefulSetReaper struct {
|
|
client appsclient.StatefulSetsGetter
|
|
podClient coreclient.PodsGetter
|
|
pollInterval, timeout time.Duration
|
|
}
|
|
|
|
// getOverlappingControllers finds rcs that this controller overlaps, as well as rcs overlapping this controller.
|
|
func getOverlappingControllers(rcClient coreclient.ReplicationControllerInterface, rc *api.ReplicationController) ([]api.ReplicationController, error) {
|
|
rcs, err := rcClient.List(metav1.ListOptions{})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error getting replication controllers: %v", err)
|
|
}
|
|
var matchingRCs []api.ReplicationController
|
|
rcLabels := labels.Set(rc.Spec.Selector)
|
|
for _, controller := range rcs.Items {
|
|
newRCLabels := labels.Set(controller.Spec.Selector)
|
|
if labels.SelectorFromSet(newRCLabels).Matches(rcLabels) || labels.SelectorFromSet(rcLabels).Matches(newRCLabels) {
|
|
matchingRCs = append(matchingRCs, controller)
|
|
}
|
|
}
|
|
return matchingRCs, nil
|
|
}
|
|
|
|
func (reaper *ReplicationControllerReaper) Stop(namespace, name string, timeout time.Duration, gracePeriod *metav1.DeleteOptions) error {
|
|
rc := reaper.client.ReplicationControllers(namespace)
|
|
scaler := &ReplicationControllerScaler{reaper.client}
|
|
ctrl, err := rc.Get(name, metav1.GetOptions{})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if timeout == 0 {
|
|
timeout = Timeout + time.Duration(10*ctrl.Spec.Replicas)*time.Second
|
|
}
|
|
|
|
// The rc manager will try and detect all matching rcs for a pod's labels,
|
|
// and only sync the oldest one. This means if we have a pod with labels
|
|
// [(k1: v1), (k2: v2)] and two rcs: rc1 with selector [(k1=v1)], and rc2 with selector [(k1=v1),(k2=v2)],
|
|
// the rc manager will sync the older of the two rcs.
|
|
//
|
|
// If there are rcs with a superset of labels, eg:
|
|
// deleting: (k1=v1), superset: (k2=v2, k1=v1)
|
|
// - It isn't safe to delete the rc because there could be a pod with labels
|
|
// (k1=v1) that isn't managed by the superset rc. We can't scale it down
|
|
// either, because there could be a pod (k2=v2, k1=v1) that it deletes
|
|
// causing a fight with the superset rc.
|
|
// If there are rcs with a subset of labels, eg:
|
|
// deleting: (k2=v2, k1=v1), subset: (k1=v1), superset: (k2=v2, k1=v1, k3=v3)
|
|
// - Even if it's safe to delete this rc without a scale down because all it's pods
|
|
// are being controlled by the subset rc the code returns an error.
|
|
|
|
// In theory, creating overlapping controllers is user error, so the loop below
|
|
// tries to account for this logic only in the common case, where we end up
|
|
// with multiple rcs that have an exact match on selectors.
|
|
|
|
overlappingCtrls, err := getOverlappingControllers(rc, ctrl)
|
|
if err != nil {
|
|
return fmt.Errorf("error getting replication controllers: %v", err)
|
|
}
|
|
exactMatchRCs := []api.ReplicationController{}
|
|
overlapRCs := []string{}
|
|
for _, overlappingRC := range overlappingCtrls {
|
|
if len(overlappingRC.Spec.Selector) == len(ctrl.Spec.Selector) {
|
|
exactMatchRCs = append(exactMatchRCs, overlappingRC)
|
|
} else {
|
|
overlapRCs = append(overlapRCs, overlappingRC.Name)
|
|
}
|
|
}
|
|
if len(overlapRCs) > 0 {
|
|
return fmt.Errorf(
|
|
"Detected overlapping controllers for rc %v: %v, please manage deletion individually with --cascade=false.",
|
|
ctrl.Name, strings.Join(overlapRCs, ","))
|
|
}
|
|
if len(exactMatchRCs) == 1 {
|
|
// No overlapping controllers.
|
|
retry := NewRetryParams(reaper.pollInterval, reaper.timeout)
|
|
waitForReplicas := NewRetryParams(reaper.pollInterval, timeout)
|
|
if err = scaler.Scale(namespace, name, 0, nil, retry, waitForReplicas); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
falseVar := false
|
|
deleteOptions := &metav1.DeleteOptions{OrphanDependents: &falseVar}
|
|
return rc.Delete(name, deleteOptions)
|
|
}
|
|
|
|
// TODO(madhusudancs): Implement it when controllerRef is implemented - https://github.com/kubernetes/kubernetes/issues/2210
|
|
// getOverlappingReplicaSets finds ReplicaSets that this ReplicaSet overlaps, as well as ReplicaSets overlapping this ReplicaSet.
|
|
func getOverlappingReplicaSets(c extensionsclient.ReplicaSetInterface, rs *extensions.ReplicaSet) ([]extensions.ReplicaSet, []extensions.ReplicaSet, error) {
|
|
var overlappingRSs, exactMatchRSs []extensions.ReplicaSet
|
|
return overlappingRSs, exactMatchRSs, nil
|
|
}
|
|
|
|
func (reaper *ReplicaSetReaper) Stop(namespace, name string, timeout time.Duration, gracePeriod *metav1.DeleteOptions) error {
|
|
rsc := reaper.client.ReplicaSets(namespace)
|
|
scaler := &ReplicaSetScaler{reaper.client}
|
|
rs, err := rsc.Get(name, metav1.GetOptions{})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if timeout == 0 {
|
|
timeout = Timeout + time.Duration(10*rs.Spec.Replicas)*time.Second
|
|
}
|
|
|
|
// The ReplicaSet controller will try and detect all matching ReplicaSets
|
|
// for a pod's labels, and only sync the oldest one. This means if we have
|
|
// a pod with labels [(k1: v1), (k2: v2)] and two ReplicaSets: rs1 with
|
|
// selector [(k1=v1)], and rs2 with selector [(k1=v1),(k2=v2)], the
|
|
// ReplicaSet controller will sync the older of the two ReplicaSets.
|
|
//
|
|
// If there are ReplicaSets with a superset of labels, eg:
|
|
// deleting: (k1=v1), superset: (k2=v2, k1=v1)
|
|
// - It isn't safe to delete the ReplicaSet because there could be a pod
|
|
// with labels (k1=v1) that isn't managed by the superset ReplicaSet.
|
|
// We can't scale it down either, because there could be a pod
|
|
// (k2=v2, k1=v1) that it deletes causing a fight with the superset
|
|
// ReplicaSet.
|
|
// If there are ReplicaSets with a subset of labels, eg:
|
|
// deleting: (k2=v2, k1=v1), subset: (k1=v1), superset: (k2=v2, k1=v1, k3=v3)
|
|
// - Even if it's safe to delete this ReplicaSet without a scale down because
|
|
// all it's pods are being controlled by the subset ReplicaSet the code
|
|
// returns an error.
|
|
|
|
// In theory, creating overlapping ReplicaSets is user error, so the loop below
|
|
// tries to account for this logic only in the common case, where we end up
|
|
// with multiple ReplicaSets that have an exact match on selectors.
|
|
|
|
// TODO(madhusudancs): Re-evaluate again when controllerRef is implemented -
|
|
// https://github.com/kubernetes/kubernetes/issues/2210
|
|
overlappingRSs, exactMatchRSs, err := getOverlappingReplicaSets(rsc, rs)
|
|
if err != nil {
|
|
return fmt.Errorf("error getting ReplicaSets: %v", err)
|
|
}
|
|
|
|
if len(overlappingRSs) > 0 {
|
|
var names []string
|
|
for _, overlappingRS := range overlappingRSs {
|
|
names = append(names, overlappingRS.Name)
|
|
}
|
|
return fmt.Errorf(
|
|
"Detected overlapping ReplicaSets for ReplicaSet %v: %v, please manage deletion individually with --cascade=false.",
|
|
rs.Name, strings.Join(names, ","))
|
|
}
|
|
if len(exactMatchRSs) == 0 {
|
|
// No overlapping ReplicaSets.
|
|
retry := NewRetryParams(reaper.pollInterval, reaper.timeout)
|
|
waitForReplicas := NewRetryParams(reaper.pollInterval, timeout)
|
|
if err = scaler.Scale(namespace, name, 0, nil, retry, waitForReplicas); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
falseVar := false
|
|
deleteOptions := &metav1.DeleteOptions{OrphanDependents: &falseVar}
|
|
return rsc.Delete(name, deleteOptions)
|
|
}
|
|
|
|
func (reaper *DaemonSetReaper) Stop(namespace, name string, timeout time.Duration, gracePeriod *metav1.DeleteOptions) error {
|
|
ds, err := reaper.client.DaemonSets(namespace).Get(name, metav1.GetOptions{})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// We set the nodeSelector to a random label. This label is nearly guaranteed
|
|
// to not be set on any node so the DameonSetController will start deleting
|
|
// daemon pods. Once it's done deleting the daemon pods, it's safe to delete
|
|
// the DaemonSet.
|
|
ds.Spec.Template.Spec.NodeSelector = map[string]string{
|
|
string(uuid.NewUUID()): string(uuid.NewUUID()),
|
|
}
|
|
// force update to avoid version conflict
|
|
ds.ResourceVersion = ""
|
|
|
|
if ds, err = reaper.client.DaemonSets(namespace).Update(ds); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Wait for the daemon set controller to kill all the daemon pods.
|
|
if err := wait.Poll(reaper.pollInterval, reaper.timeout, func() (bool, error) {
|
|
updatedDS, err := reaper.client.DaemonSets(namespace).Get(name, metav1.GetOptions{})
|
|
if err != nil {
|
|
return false, nil
|
|
}
|
|
|
|
return updatedDS.Status.CurrentNumberScheduled+updatedDS.Status.NumberMisscheduled == 0, nil
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
|
|
falseVar := false
|
|
deleteOptions := &metav1.DeleteOptions{OrphanDependents: &falseVar}
|
|
return reaper.client.DaemonSets(namespace).Delete(name, deleteOptions)
|
|
}
|
|
|
|
func (reaper *StatefulSetReaper) Stop(namespace, name string, timeout time.Duration, gracePeriod *metav1.DeleteOptions) error {
|
|
statefulsets := reaper.client.StatefulSets(namespace)
|
|
scaler := &StatefulSetScaler{reaper.client}
|
|
ss, err := statefulsets.Get(name, metav1.GetOptions{})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if timeout == 0 {
|
|
numReplicas := ss.Spec.Replicas
|
|
// See discussion of this behavior here:
|
|
// https://github.com/kubernetes/kubernetes/pull/46468#discussion_r118589512
|
|
timeout = Timeout + time.Duration(10*numReplicas)*time.Second
|
|
}
|
|
|
|
retry := NewRetryParams(reaper.pollInterval, reaper.timeout)
|
|
waitForStatefulSet := NewRetryParams(reaper.pollInterval, timeout)
|
|
if err = scaler.Scale(namespace, name, 0, nil, retry, waitForStatefulSet); err != nil {
|
|
return err
|
|
}
|
|
|
|
// TODO: Cleanup volumes? We don't want to accidentally delete volumes from
|
|
// stop, so just leave this up to the statefulset.
|
|
falseVar := false
|
|
deleteOptions := &metav1.DeleteOptions{OrphanDependents: &falseVar}
|
|
return statefulsets.Delete(name, deleteOptions)
|
|
}
|
|
|
|
func (reaper *JobReaper) Stop(namespace, name string, timeout time.Duration, gracePeriod *metav1.DeleteOptions) error {
|
|
jobs := reaper.client.Jobs(namespace)
|
|
pods := reaper.podClient.Pods(namespace)
|
|
scaler := ScalerFor(schema.GroupKind{Group: batch.GroupName, Kind: "Job"}, reaper.client, nil, schema.GroupResource{})
|
|
job, err := jobs.Get(name, metav1.GetOptions{})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if timeout == 0 {
|
|
// we will never have more active pods than job.Spec.Parallelism
|
|
parallelism := *job.Spec.Parallelism
|
|
timeout = Timeout + time.Duration(10*parallelism)*time.Second
|
|
}
|
|
|
|
// TODO: handle overlapping jobs
|
|
retry := NewRetryParams(reaper.pollInterval, reaper.timeout)
|
|
waitForJobs := NewRetryParams(reaper.pollInterval, timeout)
|
|
if err = scaler.Scale(namespace, name, 0, nil, retry, waitForJobs); err != nil {
|
|
return err
|
|
}
|
|
// at this point only dead pods are left, that should be removed
|
|
selector, _ := metav1.LabelSelectorAsSelector(job.Spec.Selector)
|
|
options := metav1.ListOptions{LabelSelector: selector.String()}
|
|
podList, err := pods.List(options)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
errList := []error{}
|
|
for _, pod := range podList.Items {
|
|
if err := pods.Delete(pod.Name, gracePeriod); err != nil {
|
|
// ignores the error when the pod isn't found
|
|
if !errors.IsNotFound(err) {
|
|
errList = append(errList, err)
|
|
}
|
|
}
|
|
}
|
|
if len(errList) > 0 {
|
|
return utilerrors.NewAggregate(errList)
|
|
}
|
|
// once we have all the pods removed we can safely remove the job itself.
|
|
falseVar := false
|
|
deleteOptions := &metav1.DeleteOptions{OrphanDependents: &falseVar}
|
|
return jobs.Delete(name, deleteOptions)
|
|
}
|
|
|
|
func (reaper *DeploymentReaper) Stop(namespace, name string, timeout time.Duration, gracePeriod *metav1.DeleteOptions) error {
|
|
deployments := reaper.dClient.Deployments(namespace)
|
|
rsReaper := &ReplicaSetReaper{reaper.rsClient, reaper.pollInterval, reaper.timeout}
|
|
|
|
deployment, err := reaper.updateDeploymentWithRetries(namespace, name, func(d *extensions.Deployment) {
|
|
// set deployment's history and scale to 0
|
|
// TODO replace with patch when available: https://github.com/kubernetes/kubernetes/issues/20527
|
|
rhl := int32(0)
|
|
d.Spec.RevisionHistoryLimit = &rhl
|
|
d.Spec.Replicas = 0
|
|
d.Spec.Paused = true
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if deployment.Initializers != nil {
|
|
var falseVar = false
|
|
nonOrphanOption := metav1.DeleteOptions{OrphanDependents: &falseVar}
|
|
return deployments.Delete(name, &nonOrphanOption)
|
|
}
|
|
|
|
// Use observedGeneration to determine if the deployment controller noticed the pause.
|
|
if err := deploymentutil.WaitForObservedDeploymentInternal(func() (*extensions.Deployment, error) {
|
|
return deployments.Get(name, metav1.GetOptions{})
|
|
}, deployment.Generation, 1*time.Second, 1*time.Minute); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Stop all replica sets belonging to this Deployment.
|
|
rss, err := deploymentutil.ListReplicaSetsInternal(deployment,
|
|
func(namespace string, options metav1.ListOptions) ([]*extensions.ReplicaSet, error) {
|
|
rsList, err := reaper.rsClient.ReplicaSets(namespace).List(options)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
rss := make([]*extensions.ReplicaSet, 0, len(rsList.Items))
|
|
for i := range rsList.Items {
|
|
rss = append(rss, &rsList.Items[i])
|
|
}
|
|
return rss, nil
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
errList := []error{}
|
|
for _, rs := range rss {
|
|
if err := rsReaper.Stop(rs.Namespace, rs.Name, timeout, gracePeriod); err != nil {
|
|
scaleGetErr, ok := err.(ScaleError)
|
|
if errors.IsNotFound(err) || (ok && errors.IsNotFound(scaleGetErr.ActualError)) {
|
|
continue
|
|
}
|
|
errList = append(errList, err)
|
|
}
|
|
}
|
|
if len(errList) > 0 {
|
|
return utilerrors.NewAggregate(errList)
|
|
}
|
|
|
|
// Delete deployment at the end.
|
|
// Note: We delete deployment at the end so that if removing RSs fails, we at least have the deployment to retry.
|
|
var falseVar = false
|
|
nonOrphanOption := metav1.DeleteOptions{OrphanDependents: &falseVar}
|
|
return deployments.Delete(name, &nonOrphanOption)
|
|
}
|
|
|
|
type updateDeploymentFunc func(d *extensions.Deployment)
|
|
|
|
func (reaper *DeploymentReaper) updateDeploymentWithRetries(namespace, name string, applyUpdate updateDeploymentFunc) (deployment *extensions.Deployment, err error) {
|
|
deployments := reaper.dClient.Deployments(namespace)
|
|
err = wait.Poll(10*time.Millisecond, 1*time.Minute, func() (bool, error) {
|
|
if deployment, err = deployments.Get(name, metav1.GetOptions{}); err != nil {
|
|
return false, err
|
|
}
|
|
// Apply the update, then attempt to push it to the apiserver.
|
|
applyUpdate(deployment)
|
|
if deployment, err = deployments.Update(deployment); err == nil {
|
|
return true, nil
|
|
}
|
|
// Retry only on update conflict.
|
|
if errors.IsConflict(err) {
|
|
return false, nil
|
|
}
|
|
return false, err
|
|
})
|
|
return deployment, err
|
|
}
|
|
|
|
func (reaper *PodReaper) Stop(namespace, name string, timeout time.Duration, gracePeriod *metav1.DeleteOptions) error {
|
|
pods := reaper.client.Pods(namespace)
|
|
_, err := pods.Get(name, metav1.GetOptions{})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return pods.Delete(name, gracePeriod)
|
|
}
|