Refactor namespace controller to use workers, do more delete collection calls

This commit is contained in:
derekwaynecarr
2016-02-03 14:43:21 -05:00
parent b4822a4768
commit 106693d9b3
8 changed files with 468 additions and 549 deletions

View File

@@ -17,19 +17,18 @@ limitations under the License.
package namespace
import (
"fmt"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
extensions_unversioned "k8s.io/kubernetes/pkg/client/typed/generated/extensions/unversioned"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/runtime"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/util/workqueue"
"k8s.io/kubernetes/pkg/watch"
"github.com/golang/glog"
@@ -37,14 +36,29 @@ import (
// NamespaceController is responsible for performing actions dependent upon a namespace phase
type NamespaceController struct {
controller *framework.Controller
StopEverything chan struct{}
// client that purges namespace content, must have list/delete privileges on all content
kubeClient clientset.Interface
// store that holds the namespaces
store cache.Store
// controller that observes the namespaces
controller *framework.Controller
// namespaces that have been queued up for processing by workers
queue *workqueue.Type
// list of versions to process
versions *unversioned.APIVersions
}
// NewNamespaceController creates a new NamespaceController
func NewNamespaceController(kubeClient clientset.Interface, versions *unversioned.APIVersions, resyncPeriod time.Duration) *NamespaceController {
var controller *framework.Controller
_, controller = framework.NewInformer(
// create the controller so we can inject the enqueue function
namespaceController := &NamespaceController{
kubeClient: kubeClient,
versions: versions,
queue: workqueue.New(),
}
// configure the backing store/controller
store, controller := framework.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return kubeClient.Core().Namespaces().List(options)
@@ -54,530 +68,88 @@ func NewNamespaceController(kubeClient clientset.Interface, versions *unversione
},
},
&api.Namespace{},
// TODO: Can we have much longer period here?
resyncPeriod,
framework.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
namespace := obj.(*api.Namespace)
if err := syncNamespace(kubeClient, versions, namespace); err != nil {
if estimate, ok := err.(*contentRemainingError); ok {
go func() {
// Estimate is the aggregate total of TerminationGracePeriodSeconds, which defaults to 30s
// for pods. However, most processes will terminate faster - within a few seconds, probably
// with a peak within 5-10s. So this division is a heuristic that avoids waiting the full
// duration when in many cases things complete more quickly. The extra second added is to
// ensure we never wait 0 seconds.
t := estimate.Estimate/2 + 1
glog.V(4).Infof("Content remaining in namespace %s, waiting %d seconds", namespace.Name, t)
time.Sleep(time.Duration(t) * time.Second)
if err := controller.Requeue(namespace); err != nil {
utilruntime.HandleError(err)
}
}()
return
}
utilruntime.HandleError(err)
}
namespaceController.enqueueNamespace(namespace)
},
UpdateFunc: func(oldObj, newObj interface{}) {
namespace := newObj.(*api.Namespace)
if err := syncNamespace(kubeClient, versions, namespace); err != nil {
if estimate, ok := err.(*contentRemainingError); ok {
go func() {
t := estimate.Estimate/2 + 1
glog.V(4).Infof("Content remaining in namespace %s, waiting %d seconds", namespace.Name, t)
time.Sleep(time.Duration(t) * time.Second)
if err := controller.Requeue(namespace); err != nil {
utilruntime.HandleError(err)
}
}()
return
}
utilruntime.HandleError(err)
}
namespaceController.enqueueNamespace(namespace)
},
},
)
return &NamespaceController{
controller: controller,
}
namespaceController.store = store
namespaceController.controller = controller
return namespaceController
}
// Run begins observing the system. It starts a goroutine and returns immediately.
func (nm *NamespaceController) Run() {
if nm.StopEverything == nil {
nm.StopEverything = make(chan struct{})
go nm.controller.Run(nm.StopEverything)
// enqueueNamespace adds an object to the controller work queue
// obj could be an *api.Namespace, or a DeletionFinalStateUnknown item.
func (nm *NamespaceController) enqueueNamespace(obj interface{}) {
key, err := controller.KeyFunc(obj)
if err != nil {
glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
return
}
nm.queue.Add(key)
}
// Stop gracefully shutsdown this controller
func (nm *NamespaceController) Stop() {
if nm.StopEverything != nil {
close(nm.StopEverything)
nm.StopEverything = nil
}
}
// finalized returns true if the spec.finalizers is empty list
func finalized(namespace *api.Namespace) bool {
return len(namespace.Spec.Finalizers) == 0
}
// finalize will finalize the namespace for kubernetes
func finalizeNamespaceFunc(kubeClient clientset.Interface, namespace *api.Namespace) (*api.Namespace, error) {
namespaceFinalize := api.Namespace{}
namespaceFinalize.ObjectMeta = namespace.ObjectMeta
namespaceFinalize.Spec = namespace.Spec
finalizerSet := sets.NewString()
for i := range namespace.Spec.Finalizers {
if namespace.Spec.Finalizers[i] != api.FinalizerKubernetes {
finalizerSet.Insert(string(namespace.Spec.Finalizers[i]))
}
}
namespaceFinalize.Spec.Finalizers = make([]api.FinalizerName, 0, len(finalizerSet))
for _, value := range finalizerSet.List() {
namespaceFinalize.Spec.Finalizers = append(namespaceFinalize.Spec.Finalizers, api.FinalizerName(value))
}
namespace, err := kubeClient.Core().Namespaces().Finalize(&namespaceFinalize)
if err != nil {
// it was removed already, so life is good
if errors.IsNotFound(err) {
return namespace, nil
}
}
return namespace, err
}
type contentRemainingError struct {
Estimate int64
}
func (e *contentRemainingError) Error() string {
return fmt.Sprintf("some content remains in the namespace, estimate %d seconds before it is removed", e.Estimate)
}
// deleteAllContent will delete all content known to the system in a namespace. It returns an estimate
// of the time remaining before the remaining resources are deleted. If estimate > 0 not all resources
// are guaranteed to be gone.
func deleteAllContent(kubeClient clientset.Interface, versions *unversioned.APIVersions, namespace string, before unversioned.Time) (estimate int64, err error) {
err = deleteServiceAccounts(kubeClient, namespace)
if err != nil {
return estimate, err
}
err = deleteServices(kubeClient, namespace)
if err != nil {
return estimate, err
}
err = deleteReplicationControllers(kubeClient, namespace)
if err != nil {
return estimate, err
}
estimate, err = deletePods(kubeClient, namespace, before)
if err != nil {
return estimate, err
}
err = deleteSecrets(kubeClient, namespace)
if err != nil {
return estimate, err
}
err = deletePersistentVolumeClaims(kubeClient, namespace)
if err != nil {
return estimate, err
}
err = deleteLimitRanges(kubeClient, namespace)
if err != nil {
return estimate, err
}
err = deleteResourceQuotas(kubeClient, namespace)
if err != nil {
return estimate, err
}
err = deleteEvents(kubeClient, namespace)
if err != nil {
return estimate, err
}
// If experimental mode, delete all experimental resources for the namespace.
if containsVersion(versions, "extensions/v1beta1") {
resources, err := kubeClient.Discovery().ServerResourcesForGroupVersion("extensions/v1beta1")
if err != nil {
return estimate, err
}
if containsResource(resources, "horizontalpodautoscalers") {
err = deleteHorizontalPodAutoscalers(kubeClient.Extensions(), namespace)
if err != nil {
return estimate, err
}
}
if containsResource(resources, "ingresses") {
err = deleteIngress(kubeClient.Extensions(), namespace)
if err != nil {
return estimate, err
}
}
if containsResource(resources, "daemonsets") {
err = deleteDaemonSets(kubeClient.Extensions(), namespace)
if err != nil {
return estimate, err
}
}
if containsResource(resources, "jobs") {
err = deleteJobs(kubeClient.Extensions(), namespace)
if err != nil {
return estimate, err
}
}
if containsResource(resources, "deployments") {
err = deleteDeployments(kubeClient.Extensions(), namespace)
if err != nil {
return estimate, err
}
}
if containsResource(resources, "replicasets") {
err = deleteReplicaSets(kubeClient.Extensions(), namespace)
if err != nil {
return estimate, err
}
}
}
return estimate, nil
}
// updateNamespaceFunc is a function that makes an update to a namespace
type updateNamespaceFunc func(kubeClient clientset.Interface, namespace *api.Namespace) (*api.Namespace, error)
// retryOnConflictError retries the specified fn if there was a conflict error
// TODO RetryOnConflict should be a generic concept in client code
func retryOnConflictError(kubeClient clientset.Interface, namespace *api.Namespace, fn updateNamespaceFunc) (result *api.Namespace, err error) {
latestNamespace := namespace
// worker processes the queue of namespace objects.
// Each namespace can be in the queue at most once.
// The system ensures that no two workers can process
// the same namespace at the same time.
func (nm *NamespaceController) worker() {
for {
result, err = fn(kubeClient, latestNamespace)
if err == nil {
return result, nil
}
if !errors.IsConflict(err) {
return nil, err
}
latestNamespace, err = kubeClient.Core().Namespaces().Get(latestNamespace.Name)
if err != nil {
return nil, err
}
}
return
}
// updateNamespaceStatusFunc will verify that the status of the namespace is correct
func updateNamespaceStatusFunc(kubeClient clientset.Interface, namespace *api.Namespace) (*api.Namespace, error) {
if namespace.DeletionTimestamp.IsZero() || namespace.Status.Phase == api.NamespaceTerminating {
return namespace, nil
}
newNamespace := api.Namespace{}
newNamespace.ObjectMeta = namespace.ObjectMeta
newNamespace.Status = namespace.Status
newNamespace.Status.Phase = api.NamespaceTerminating
return kubeClient.Core().Namespaces().UpdateStatus(&newNamespace)
}
// syncNamespace orchestrates deletion of a Namespace and its associated content.
func syncNamespace(kubeClient clientset.Interface, versions *unversioned.APIVersions, namespace *api.Namespace) error {
if namespace.DeletionTimestamp == nil {
return nil
}
// multiple controllers may edit a namespace during termination
// first get the latest state of the namespace before proceeding
// if the namespace was deleted already, don't do anything
namespace, err := kubeClient.Core().Namespaces().Get(namespace.Name)
if err != nil {
if errors.IsNotFound(err) {
return nil
}
return err
}
glog.V(4).Infof("Syncing namespace %s", namespace.Name)
// ensure that the status is up to date on the namespace
// if we get a not found error, we assume the namespace is truly gone
namespace, err = retryOnConflictError(kubeClient, namespace, updateNamespaceStatusFunc)
if err != nil {
if errors.IsNotFound(err) {
return nil
}
return err
}
// if the namespace is already finalized, delete it
if finalized(namespace) {
err = kubeClient.Core().Namespaces().Delete(namespace.Name, nil)
if err != nil && !errors.IsNotFound(err) {
return err
}
return nil
}
// there may still be content for us to remove
estimate, err := deleteAllContent(kubeClient, versions, namespace.Name, *namespace.DeletionTimestamp)
if err != nil {
return err
}
if estimate > 0 {
return &contentRemainingError{estimate}
}
// we have removed content, so mark it finalized by us
result, err := retryOnConflictError(kubeClient, namespace, finalizeNamespaceFunc)
if err != nil {
return err
}
// now check if all finalizers have reported that we delete now
if finalized(result) {
err = kubeClient.Core().Namespaces().Delete(namespace.Name, nil)
if err != nil && !errors.IsNotFound(err) {
return err
}
}
return nil
}
func deleteLimitRanges(kubeClient clientset.Interface, ns string) error {
items, err := kubeClient.Core().LimitRanges(ns).List(api.ListOptions{})
if err != nil {
return err
}
for i := range items.Items {
err := kubeClient.Core().LimitRanges(ns).Delete(items.Items[i].Name, nil)
if err != nil && !errors.IsNotFound(err) {
return err
}
}
return nil
}
func deleteResourceQuotas(kubeClient clientset.Interface, ns string) error {
resourceQuotas, err := kubeClient.Core().ResourceQuotas(ns).List(api.ListOptions{})
if err != nil {
return err
}
for i := range resourceQuotas.Items {
err := kubeClient.Core().ResourceQuotas(ns).Delete(resourceQuotas.Items[i].Name, nil)
if err != nil && !errors.IsNotFound(err) {
return err
}
}
return nil
}
func deleteServiceAccounts(kubeClient clientset.Interface, ns string) error {
items, err := kubeClient.Core().ServiceAccounts(ns).List(api.ListOptions{})
if err != nil {
return err
}
for i := range items.Items {
err := kubeClient.Core().ServiceAccounts(ns).Delete(items.Items[i].Name, nil)
if err != nil && !errors.IsNotFound(err) {
return err
}
}
return nil
}
func deleteServices(kubeClient clientset.Interface, ns string) error {
items, err := kubeClient.Core().Services(ns).List(api.ListOptions{})
if err != nil {
return err
}
for i := range items.Items {
err := kubeClient.Core().Services(ns).Delete(items.Items[i].Name, nil)
if err != nil && !errors.IsNotFound(err) {
return err
}
}
return nil
}
func deleteReplicationControllers(kubeClient clientset.Interface, ns string) error {
items, err := kubeClient.Core().ReplicationControllers(ns).List(api.ListOptions{})
if err != nil {
return err
}
for i := range items.Items {
err := kubeClient.Core().ReplicationControllers(ns).Delete(items.Items[i].Name, nil)
if err != nil && !errors.IsNotFound(err) {
return err
}
}
return nil
}
func deletePods(kubeClient clientset.Interface, ns string, before unversioned.Time) (int64, error) {
items, err := kubeClient.Core().Pods(ns).List(api.ListOptions{})
if err != nil {
return 0, err
}
expired := unversioned.Now().After(before.Time)
var deleteOptions *api.DeleteOptions
if expired {
deleteOptions = api.NewDeleteOptions(0)
}
estimate := int64(0)
for i := range items.Items {
if items.Items[i].Spec.TerminationGracePeriodSeconds != nil {
grace := *items.Items[i].Spec.TerminationGracePeriodSeconds
if grace > estimate {
estimate = grace
func() {
key, quit := nm.queue.Get()
if quit {
return
}
}
err := kubeClient.Core().Pods(ns).Delete(items.Items[i].Name, deleteOptions)
if err != nil && !errors.IsNotFound(err) {
return 0, err
}
defer nm.queue.Done(key)
if err := nm.syncNamespaceFromKey(key.(string)); err != nil {
if estimate, ok := err.(*contentRemainingError); ok {
go func() {
t := estimate.Estimate/2 + 1
glog.V(4).Infof("Content remaining in namespace %s, waiting %d seconds", key, t)
time.Sleep(time.Duration(t) * time.Second)
nm.queue.Add(key)
}()
}
}
}()
}
if expired {
estimate = 0
}
return estimate, nil
}
func deleteEvents(kubeClient clientset.Interface, ns string) error {
return kubeClient.Core().Events(ns).DeleteCollection(nil, api.ListOptions{})
}
// syncNamespaceFromKey looks for a namespace with the specified key in its store and synchronizes it
func (nm *NamespaceController) syncNamespaceFromKey(key string) (err error) {
startTime := time.Now()
defer glog.V(4).Infof("Finished syncing namespace %q (%v)", key, time.Now().Sub(startTime))
func deleteSecrets(kubeClient clientset.Interface, ns string) error {
items, err := kubeClient.Core().Secrets(ns).List(api.ListOptions{})
obj, exists, err := nm.store.GetByKey(key)
if !exists {
glog.Infof("Namespace has been deleted %v", key)
return nil
}
if err != nil {
glog.Infof("Unable to retrieve namespace %v from store: %v", key, err)
nm.queue.Add(key)
return err
}
for i := range items.Items {
err := kubeClient.Core().Secrets(ns).Delete(items.Items[i].Name, nil)
if err != nil && !errors.IsNotFound(err) {
return err
}
}
return nil
namespace := obj.(*api.Namespace)
return syncNamespace(nm.kubeClient, nm.versions, namespace)
}
func deletePersistentVolumeClaims(kubeClient clientset.Interface, ns string) error {
items, err := kubeClient.Core().PersistentVolumeClaims(ns).List(api.ListOptions{})
if err != nil {
return err
// Run starts observing the system with the specified number of workers.
func (nm *NamespaceController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
go nm.controller.Run(stopCh)
for i := 0; i < workers; i++ {
go wait.Until(nm.worker, time.Second, stopCh)
}
for i := range items.Items {
err := kubeClient.Core().PersistentVolumeClaims(ns).Delete(items.Items[i].Name, nil)
if err != nil && !errors.IsNotFound(err) {
return err
}
}
return nil
}
func deleteHorizontalPodAutoscalers(expClient extensions_unversioned.ExtensionsInterface, ns string) error {
items, err := expClient.HorizontalPodAutoscalers(ns).List(api.ListOptions{})
if err != nil {
return err
}
for i := range items.Items {
err := expClient.HorizontalPodAutoscalers(ns).Delete(items.Items[i].Name, nil)
if err != nil && !errors.IsNotFound(err) {
return err
}
}
return nil
}
func deleteDaemonSets(expClient extensions_unversioned.ExtensionsInterface, ns string) error {
items, err := expClient.DaemonSets(ns).List(api.ListOptions{})
if err != nil {
return err
}
for i := range items.Items {
err := expClient.DaemonSets(ns).Delete(items.Items[i].Name, nil)
if err != nil && !errors.IsNotFound(err) {
return err
}
}
return nil
}
func deleteJobs(expClient extensions_unversioned.ExtensionsInterface, ns string) error {
items, err := expClient.Jobs(ns).List(api.ListOptions{})
if err != nil {
return err
}
for i := range items.Items {
err := expClient.Jobs(ns).Delete(items.Items[i].Name, nil)
if err != nil && !errors.IsNotFound(err) {
return err
}
}
return nil
}
func deleteDeployments(expClient extensions_unversioned.ExtensionsInterface, ns string) error {
items, err := expClient.Deployments(ns).List(api.ListOptions{})
if err != nil {
return err
}
for i := range items.Items {
err := expClient.Deployments(ns).Delete(items.Items[i].Name, nil)
if err != nil && !errors.IsNotFound(err) {
return err
}
}
return nil
}
func deleteIngress(expClient extensions_unversioned.ExtensionsInterface, ns string) error {
items, err := expClient.Ingresses(ns).List(api.ListOptions{})
if err != nil {
return err
}
for i := range items.Items {
err := expClient.Ingresses(ns).Delete(items.Items[i].Name, nil)
if err != nil && !errors.IsNotFound(err) {
return err
}
}
return nil
}
func deleteReplicaSets(expClient extensions_unversioned.ExtensionsInterface, ns string) error {
items, err := expClient.ReplicaSets(ns).List(api.ListOptions{})
if err != nil {
return err
}
for i := range items.Items {
err := expClient.ReplicaSets(ns).Delete(items.Items[i].Name, nil)
if err != nil && !errors.IsNotFound(err) {
return err
}
}
return nil
}
// TODO: this is duplicated logic. Move it somewhere central?
func containsVersion(versions *unversioned.APIVersions, version string) bool {
for ix := range versions.Versions {
if versions.Versions[ix] == version {
return true
}
}
return false
}
// TODO: this is duplicated logic. Move it somewhere central?
func containsResource(resources *unversioned.APIResourceList, resourceName string) bool {
if resources == nil {
return false
}
for ix := range resources.APIResources {
resource := resources.APIResources[ix]
if resource.Name == resourceName {
return true
}
}
return false
<-stopCh
glog.Infof("Shutting down NamespaceController")
nm.queue.ShutDown()
}