Merge pull request #26355 from deads2k/refactor-quota-controller-access
Automatic merge from submit-queue refactor quota evaluation to cleanly abstract the quota access This refactor cleanly separates out the quota accessor parts of the evaluator. This change made it easier to shutdown nicely and pulls the object retrieval logic (which has become rather complex in and of itself) out of the main evaluation flow. @derekwaynecarr
This commit is contained in:
commit
85d5b8db9a
@ -18,8 +18,6 @@ package resourcequota
|
||||
|
||||
import (
|
||||
"io"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
||||
@ -28,7 +26,6 @@ import (
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/quota"
|
||||
"k8s.io/kubernetes/pkg/quota/install"
|
||||
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
|
||||
)
|
||||
|
||||
func init() {
|
||||
@ -44,7 +41,7 @@ func init() {
|
||||
type quotaAdmission struct {
|
||||
*admission.Handler
|
||||
|
||||
evaluator *quotaEvaluator
|
||||
evaluator Evaluator
|
||||
}
|
||||
|
||||
type liveLookupEntry struct {
|
||||
@ -56,13 +53,13 @@ type liveLookupEntry struct {
|
||||
// using the provided registry. The registry must have the capability to handle group/kinds that
|
||||
// are persisted by the server this admission controller is intercepting
|
||||
func NewResourceQuota(client clientset.Interface, registry quota.Registry, numEvaluators int, stopCh <-chan struct{}) (admission.Interface, error) {
|
||||
evaluator, err := newQuotaEvaluator(client, registry)
|
||||
quotaAccessor, err := newQuotaAccessor(client)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
go quotaAccessor.Run(stopCh)
|
||||
|
||||
defer utilruntime.HandleCrash()
|
||||
go evaluator.Run(numEvaluators, stopCh)
|
||||
evaluator := NewQuotaEvaluator(quotaAccessor, registry, numEvaluators, stopCh)
|
||||
|
||||
return "aAdmission{
|
||||
Handler: admission.NewHandler(admission.Create, admission.Update),
|
||||
@ -77,47 +74,5 @@ func (q *quotaAdmission) Admit(a admission.Attributes) (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// if we do not know how to evaluate use for this kind, just ignore
|
||||
evaluators := q.evaluator.registry.Evaluators()
|
||||
evaluator, found := evaluators[a.GetKind().GroupKind()]
|
||||
if !found {
|
||||
return nil
|
||||
}
|
||||
|
||||
// for this kind, check if the operation could mutate any quota resources
|
||||
// if no resources tracked by quota are impacted, then just return
|
||||
op := a.GetOperation()
|
||||
operationResources := evaluator.OperationResources(op)
|
||||
if len(operationResources) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
return q.evaluator.evaluate(a)
|
||||
}
|
||||
|
||||
// prettyPrint formats a resource list for usage in errors
|
||||
// it outputs resources sorted in increasing order
|
||||
func prettyPrint(item api.ResourceList) string {
|
||||
parts := []string{}
|
||||
keys := []string{}
|
||||
for key := range item {
|
||||
keys = append(keys, string(key))
|
||||
}
|
||||
sort.Strings(keys)
|
||||
for _, key := range keys {
|
||||
value := item[api.ResourceName(key)]
|
||||
constraint := key + "=" + value.String()
|
||||
parts = append(parts, constraint)
|
||||
}
|
||||
return strings.Join(parts, ",")
|
||||
}
|
||||
|
||||
// hasUsageStats returns true if for each hard constraint there is a value for its current usage
|
||||
func hasUsageStats(resourceQuota *api.ResourceQuota) bool {
|
||||
for resourceName := range resourceQuota.Status.Hard {
|
||||
if _, found := resourceQuota.Status.Used[resourceName]; !found {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
return q.evaluator.Evaluate(a)
|
||||
}
|
||||
|
@ -144,12 +144,15 @@ func TestAdmissionIgnoresSubresources(t *testing.T) {
|
||||
resourceQuota.Status.Used[api.ResourceMemory] = resource.MustParse("1Gi")
|
||||
kubeClient := fake.NewSimpleClientset(resourceQuota)
|
||||
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc})
|
||||
evaluator, _ := newQuotaEvaluator(kubeClient, install.NewRegistry(kubeClient))
|
||||
evaluator.indexer = indexer
|
||||
stopCh := make(chan struct{})
|
||||
defer close(stopCh)
|
||||
|
||||
quotaAccessor, _ := newQuotaAccessor(kubeClient)
|
||||
quotaAccessor.indexer = indexer
|
||||
go quotaAccessor.Run(stopCh)
|
||||
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), 5, stopCh)
|
||||
|
||||
defer utilruntime.HandleCrash()
|
||||
go evaluator.Run(5, stopCh)
|
||||
handler := "aAdmission{
|
||||
Handler: admission.NewHandler(admission.Create, admission.Update),
|
||||
evaluator: evaluator,
|
||||
@ -185,12 +188,15 @@ func TestAdmitBelowQuotaLimit(t *testing.T) {
|
||||
}
|
||||
kubeClient := fake.NewSimpleClientset(resourceQuota)
|
||||
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc})
|
||||
evaluator, _ := newQuotaEvaluator(kubeClient, install.NewRegistry(kubeClient))
|
||||
evaluator.indexer = indexer
|
||||
stopCh := make(chan struct{})
|
||||
defer close(stopCh)
|
||||
|
||||
quotaAccessor, _ := newQuotaAccessor(kubeClient)
|
||||
quotaAccessor.indexer = indexer
|
||||
go quotaAccessor.Run(stopCh)
|
||||
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), 5, stopCh)
|
||||
|
||||
defer utilruntime.HandleCrash()
|
||||
go evaluator.Run(5, stopCh)
|
||||
handler := "aAdmission{
|
||||
Handler: admission.NewHandler(admission.Create, admission.Update),
|
||||
evaluator: evaluator,
|
||||
@ -265,12 +271,14 @@ func TestAdmitHandlesOldObjects(t *testing.T) {
|
||||
// start up quota system
|
||||
kubeClient := fake.NewSimpleClientset(resourceQuota)
|
||||
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc})
|
||||
evaluator, _ := newQuotaEvaluator(kubeClient, install.NewRegistry(kubeClient))
|
||||
evaluator.indexer = indexer
|
||||
stopCh := make(chan struct{})
|
||||
defer close(stopCh)
|
||||
defer utilruntime.HandleCrash()
|
||||
go evaluator.Run(5, stopCh)
|
||||
|
||||
quotaAccessor, _ := newQuotaAccessor(kubeClient)
|
||||
quotaAccessor.indexer = indexer
|
||||
go quotaAccessor.Run(stopCh)
|
||||
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), 5, stopCh)
|
||||
|
||||
handler := "aAdmission{
|
||||
Handler: admission.NewHandler(admission.Create, admission.Update),
|
||||
evaluator: evaluator,
|
||||
@ -353,12 +361,15 @@ func TestAdmitExceedQuotaLimit(t *testing.T) {
|
||||
}
|
||||
kubeClient := fake.NewSimpleClientset(resourceQuota)
|
||||
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc})
|
||||
evaluator, _ := newQuotaEvaluator(kubeClient, install.NewRegistry(kubeClient))
|
||||
evaluator.indexer = indexer
|
||||
stopCh := make(chan struct{})
|
||||
defer close(stopCh)
|
||||
|
||||
quotaAccessor, _ := newQuotaAccessor(kubeClient)
|
||||
quotaAccessor.indexer = indexer
|
||||
go quotaAccessor.Run(stopCh)
|
||||
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), 5, stopCh)
|
||||
|
||||
defer utilruntime.HandleCrash()
|
||||
go evaluator.Run(5, stopCh)
|
||||
handler := "aAdmission{
|
||||
Handler: admission.NewHandler(admission.Create, admission.Update),
|
||||
evaluator: evaluator,
|
||||
@ -394,12 +405,15 @@ func TestAdmitEnforceQuotaConstraints(t *testing.T) {
|
||||
}
|
||||
kubeClient := fake.NewSimpleClientset(resourceQuota)
|
||||
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc})
|
||||
evaluator, _ := newQuotaEvaluator(kubeClient, install.NewRegistry(kubeClient))
|
||||
evaluator.indexer = indexer
|
||||
stopCh := make(chan struct{})
|
||||
defer close(stopCh)
|
||||
|
||||
quotaAccessor, _ := newQuotaAccessor(kubeClient)
|
||||
quotaAccessor.indexer = indexer
|
||||
go quotaAccessor.Run(stopCh)
|
||||
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), 5, stopCh)
|
||||
|
||||
defer utilruntime.HandleCrash()
|
||||
go evaluator.Run(5, stopCh)
|
||||
handler := "aAdmission{
|
||||
Handler: admission.NewHandler(admission.Create, admission.Update),
|
||||
evaluator: evaluator,
|
||||
@ -444,13 +458,16 @@ func TestAdmitPodInNamespaceWithoutQuota(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
evaluator, _ := newQuotaEvaluator(kubeClient, install.NewRegistry(kubeClient))
|
||||
evaluator.indexer = indexer
|
||||
evaluator.liveLookupCache = liveLookupCache
|
||||
stopCh := make(chan struct{})
|
||||
defer close(stopCh)
|
||||
|
||||
quotaAccessor, _ := newQuotaAccessor(kubeClient)
|
||||
quotaAccessor.indexer = indexer
|
||||
quotaAccessor.liveLookupCache = liveLookupCache
|
||||
go quotaAccessor.Run(stopCh)
|
||||
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), 5, stopCh)
|
||||
|
||||
defer utilruntime.HandleCrash()
|
||||
go evaluator.Run(5, stopCh)
|
||||
handler := "aAdmission{
|
||||
Handler: admission.NewHandler(admission.Create, admission.Update),
|
||||
evaluator: evaluator,
|
||||
@ -508,12 +525,15 @@ func TestAdmitBelowTerminatingQuotaLimit(t *testing.T) {
|
||||
}
|
||||
kubeClient := fake.NewSimpleClientset(resourceQuotaTerminating, resourceQuotaNonTerminating)
|
||||
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc})
|
||||
evaluator, _ := newQuotaEvaluator(kubeClient, install.NewRegistry(kubeClient))
|
||||
evaluator.indexer = indexer
|
||||
stopCh := make(chan struct{})
|
||||
defer close(stopCh)
|
||||
|
||||
quotaAccessor, _ := newQuotaAccessor(kubeClient)
|
||||
quotaAccessor.indexer = indexer
|
||||
go quotaAccessor.Run(stopCh)
|
||||
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), 5, stopCh)
|
||||
|
||||
defer utilruntime.HandleCrash()
|
||||
go evaluator.Run(5, stopCh)
|
||||
handler := "aAdmission{
|
||||
Handler: admission.NewHandler(admission.Create, admission.Update),
|
||||
evaluator: evaluator,
|
||||
@ -610,12 +630,15 @@ func TestAdmitBelowBestEffortQuotaLimit(t *testing.T) {
|
||||
}
|
||||
kubeClient := fake.NewSimpleClientset(resourceQuotaBestEffort, resourceQuotaNotBestEffort)
|
||||
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc})
|
||||
evaluator, _ := newQuotaEvaluator(kubeClient, install.NewRegistry(kubeClient))
|
||||
evaluator.indexer = indexer
|
||||
stopCh := make(chan struct{})
|
||||
defer close(stopCh)
|
||||
|
||||
quotaAccessor, _ := newQuotaAccessor(kubeClient)
|
||||
quotaAccessor.indexer = indexer
|
||||
go quotaAccessor.Run(stopCh)
|
||||
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), 5, stopCh)
|
||||
|
||||
defer utilruntime.HandleCrash()
|
||||
go evaluator.Run(5, stopCh)
|
||||
handler := "aAdmission{
|
||||
Handler: admission.NewHandler(admission.Create, admission.Update),
|
||||
evaluator: evaluator,
|
||||
@ -699,12 +722,15 @@ func TestAdmitBestEffortQuotaLimitIgnoresBurstable(t *testing.T) {
|
||||
}
|
||||
kubeClient := fake.NewSimpleClientset(resourceQuota)
|
||||
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc})
|
||||
evaluator, _ := newQuotaEvaluator(kubeClient, install.NewRegistry(kubeClient))
|
||||
evaluator.indexer = indexer
|
||||
stopCh := make(chan struct{})
|
||||
defer close(stopCh)
|
||||
|
||||
quotaAccessor, _ := newQuotaAccessor(kubeClient)
|
||||
quotaAccessor.indexer = indexer
|
||||
go quotaAccessor.Run(stopCh)
|
||||
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), 5, stopCh)
|
||||
|
||||
defer utilruntime.HandleCrash()
|
||||
go evaluator.Run(5, stopCh)
|
||||
handler := "aAdmission{
|
||||
Handler: admission.NewHandler(admission.Create, admission.Update),
|
||||
evaluator: evaluator,
|
||||
@ -814,13 +840,16 @@ func TestAdmissionSetsMissingNamespace(t *testing.T) {
|
||||
podEvaluator.GroupKind(): podEvaluator,
|
||||
},
|
||||
}
|
||||
evaluator, _ := newQuotaEvaluator(kubeClient, install.NewRegistry(kubeClient))
|
||||
evaluator.indexer = indexer
|
||||
evaluator.registry = registry
|
||||
stopCh := make(chan struct{})
|
||||
defer close(stopCh)
|
||||
|
||||
quotaAccessor, _ := newQuotaAccessor(kubeClient)
|
||||
quotaAccessor.indexer = indexer
|
||||
go quotaAccessor.Run(stopCh)
|
||||
evaluator := NewQuotaEvaluator(quotaAccessor, install.NewRegistry(kubeClient), 5, stopCh)
|
||||
evaluator.(*quotaEvaluator).registry = registry
|
||||
|
||||
defer utilruntime.HandleCrash()
|
||||
go evaluator.Run(5, stopCh)
|
||||
handler := "aAdmission{
|
||||
Handler: admission.NewHandler(admission.Create, admission.Update),
|
||||
evaluator: evaluator,
|
||||
|
@ -18,46 +18,36 @@ package resourcequota
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
lru "github.com/hashicorp/golang-lru"
|
||||
|
||||
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
||||
|
||||
"k8s.io/kubernetes/pkg/admission"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/meta"
|
||||
"k8s.io/kubernetes/pkg/client/cache"
|
||||
"k8s.io/kubernetes/pkg/quota"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
"k8s.io/kubernetes/pkg/storage/etcd"
|
||||
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
|
||||
"k8s.io/kubernetes/pkg/util/sets"
|
||||
"k8s.io/kubernetes/pkg/util/wait"
|
||||
"k8s.io/kubernetes/pkg/util/workqueue"
|
||||
"k8s.io/kubernetes/pkg/watch"
|
||||
)
|
||||
|
||||
type quotaEvaluator struct {
|
||||
client clientset.Interface
|
||||
// Evaluator is used to see if quota constraints are satisfied.
|
||||
type Evaluator interface {
|
||||
// Evaluate takes an operation and checks to see if quota constraints are satisfied. It returns an error if they are not.
|
||||
// The default implementation process related operations in chunks when possible.
|
||||
Evaluate(a admission.Attributes) error
|
||||
}
|
||||
|
||||
type quotaEvaluator struct {
|
||||
quotaAccessor QuotaAccessor
|
||||
|
||||
// indexer that holds quota objects by namespace
|
||||
indexer cache.Indexer
|
||||
// registry that knows how to measure usage for objects
|
||||
registry quota.Registry
|
||||
|
||||
// liveLookups holds the last few live lookups we've done to help ammortize cost on repeated lookup failures.
|
||||
// This let's us handle the case of latent caches, by looking up actual results for a namespace on cache miss/no results.
|
||||
// We track the lookup result here so that for repeated requests, we don't look it up very often.
|
||||
liveLookupCache *lru.Cache
|
||||
liveTTL time.Duration
|
||||
// updatedQuotas holds a cache of quotas that we've updated. This is used to pull the "really latest" during back to
|
||||
// back quota evaluations that touch the same quota doc. This only works because we can compare etcd resourceVersions
|
||||
// for the same resource as integers. Before this change: 22 updates with 12 conflicts. after this change: 15 updates with 0 conflicts
|
||||
updatedQuotas *lru.Cache
|
||||
|
||||
// TODO these are used together to bucket items by namespace and then batch them up for processing.
|
||||
// The technique is valuable for rollup activities to avoid fanout and reduce resource contention.
|
||||
// We could move this into a library if another component needed it.
|
||||
@ -67,6 +57,11 @@ type quotaEvaluator struct {
|
||||
work map[string][]*admissionWaiter
|
||||
dirtyWork map[string][]*admissionWaiter
|
||||
inProgress sets.String
|
||||
|
||||
// controls the run method so that we can cleanly conform to the Evaluator interface
|
||||
workers int
|
||||
stopCh <-chan struct{}
|
||||
init sync.Once
|
||||
}
|
||||
|
||||
type admissionWaiter struct {
|
||||
@ -98,52 +93,33 @@ func newAdmissionWaiter(a admission.Attributes) *admissionWaiter {
|
||||
}
|
||||
}
|
||||
|
||||
// newQuotaEvaluator configures an admission controller that can enforce quota constraints
|
||||
// NewQuotaEvaluator configures an admission controller that can enforce quota constraints
|
||||
// using the provided registry. The registry must have the capability to handle group/kinds that
|
||||
// are persisted by the server this admission controller is intercepting
|
||||
func newQuotaEvaluator(client clientset.Interface, registry quota.Registry) (*quotaEvaluator, error) {
|
||||
liveLookupCache, err := lru.New(100)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
updatedCache, err := lru.New(100)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
lw := &cache.ListWatch{
|
||||
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
|
||||
return client.Core().ResourceQuotas(api.NamespaceAll).List(options)
|
||||
},
|
||||
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
|
||||
return client.Core().ResourceQuotas(api.NamespaceAll).Watch(options)
|
||||
},
|
||||
}
|
||||
indexer, reflector := cache.NewNamespaceKeyedIndexerAndReflector(lw, &api.ResourceQuota{}, 0)
|
||||
|
||||
reflector.Run()
|
||||
func NewQuotaEvaluator(quotaAccessor QuotaAccessor, registry quota.Registry, workers int, stopCh <-chan struct{}) Evaluator {
|
||||
return "aEvaluator{
|
||||
client: client,
|
||||
indexer: indexer,
|
||||
registry: registry,
|
||||
liveLookupCache: liveLookupCache,
|
||||
liveTTL: time.Duration(30 * time.Second),
|
||||
updatedQuotas: updatedCache,
|
||||
quotaAccessor: quotaAccessor,
|
||||
|
||||
registry: registry,
|
||||
|
||||
queue: workqueue.New(),
|
||||
work: map[string][]*admissionWaiter{},
|
||||
dirtyWork: map[string][]*admissionWaiter{},
|
||||
inProgress: sets.String{},
|
||||
}, nil
|
||||
|
||||
workers: workers,
|
||||
stopCh: stopCh,
|
||||
}
|
||||
}
|
||||
|
||||
// Run begins watching and syncing.
|
||||
func (e *quotaEvaluator) Run(workers int, stopCh <-chan struct{}) {
|
||||
func (e *quotaEvaluator) run() {
|
||||
defer utilruntime.HandleCrash()
|
||||
|
||||
for i := 0; i < workers; i++ {
|
||||
go wait.Until(e.doWork, time.Second, stopCh)
|
||||
for i := 0; i < e.workers; i++ {
|
||||
go wait.Until(e.doWork, time.Second, e.stopCh)
|
||||
}
|
||||
<-stopCh
|
||||
<-e.stopCh
|
||||
glog.Infof("Shutting down quota evaluator")
|
||||
e.queue.ShutDown()
|
||||
}
|
||||
@ -179,7 +155,7 @@ func (e *quotaEvaluator) checkAttributes(ns string, admissionAttributes []*admis
|
||||
}
|
||||
}()
|
||||
|
||||
quotas, err := e.getQuotas(ns)
|
||||
quotas, err := e.quotaAccessor.GetQuotas(ns)
|
||||
if err != nil {
|
||||
for _, admissionAttribute := range admissionAttributes {
|
||||
admissionAttribute.result = err
|
||||
@ -257,14 +233,9 @@ func (e *quotaEvaluator) checkQuotas(quotas []api.ResourceQuota, admissionAttrib
|
||||
continue
|
||||
}
|
||||
|
||||
if updatedQuota, err := e.client.Core().ResourceQuotas(newQuota.Namespace).UpdateStatus(&newQuota); err != nil {
|
||||
if err := e.quotaAccessor.UpdateQuotaStatus(&newQuota); err != nil {
|
||||
updatedFailedQuotas = append(updatedFailedQuotas, newQuota)
|
||||
lastErr = err
|
||||
|
||||
} else {
|
||||
// update our cache
|
||||
e.updateCache(updatedQuota)
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@ -293,7 +264,7 @@ func (e *quotaEvaluator) checkQuotas(quotas []api.ResourceQuota, admissionAttrib
|
||||
// you've added a new documented, then updated an old one, your resource matches both and you're only checking one
|
||||
// updates for these quota names failed. Get the current quotas in the namespace, compare by name, check to see if the
|
||||
// resource versions have changed. If not, we're going to fall through an fail everything. If they all have, then we can try again
|
||||
newQuotas, err := e.getQuotas(quotas[0].Namespace)
|
||||
newQuotas, err := e.quotaAccessor.GetQuotas(quotas[0].Namespace)
|
||||
if err != nil {
|
||||
// this means that updates failed. Anything with a default deny error has failed and we need to let them know
|
||||
for _, admissionAttribute := range admissionAttributes {
|
||||
@ -416,7 +387,25 @@ func (e *quotaEvaluator) checkRequest(quotas []api.ResourceQuota, a admission.At
|
||||
return quotas, nil
|
||||
}
|
||||
|
||||
func (e *quotaEvaluator) evaluate(a admission.Attributes) error {
|
||||
func (e *quotaEvaluator) Evaluate(a admission.Attributes) error {
|
||||
e.init.Do(func() {
|
||||
go e.run()
|
||||
})
|
||||
|
||||
// if we do not know how to evaluate use for this kind, just ignore
|
||||
evaluators := e.registry.Evaluators()
|
||||
evaluator, found := evaluators[a.GetKind().GroupKind()]
|
||||
if !found {
|
||||
return nil
|
||||
}
|
||||
// for this kind, check if the operation could mutate any quota resources
|
||||
// if no resources tracked by quota are impacted, then just return
|
||||
op := a.GetOperation()
|
||||
operationResources := evaluator.OperationResources(op)
|
||||
if len(operationResources) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
waiter := newAdmissionWaiter(a)
|
||||
|
||||
e.addWork(waiter)
|
||||
@ -485,72 +474,29 @@ func (e *quotaEvaluator) getWork() (string, []*admissionWaiter, bool) {
|
||||
return ns, []*admissionWaiter{}, false
|
||||
}
|
||||
|
||||
func (e *quotaEvaluator) updateCache(quota *api.ResourceQuota) {
|
||||
key := quota.Namespace + "/" + quota.Name
|
||||
e.updatedQuotas.Add(key, quota)
|
||||
// prettyPrint formats a resource list for usage in errors
|
||||
// it outputs resources sorted in increasing order
|
||||
func prettyPrint(item api.ResourceList) string {
|
||||
parts := []string{}
|
||||
keys := []string{}
|
||||
for key := range item {
|
||||
keys = append(keys, string(key))
|
||||
}
|
||||
sort.Strings(keys)
|
||||
for _, key := range keys {
|
||||
value := item[api.ResourceName(key)]
|
||||
constraint := key + "=" + value.String()
|
||||
parts = append(parts, constraint)
|
||||
}
|
||||
return strings.Join(parts, ",")
|
||||
}
|
||||
|
||||
var etcdVersioner = etcd.APIObjectVersioner{}
|
||||
|
||||
// checkCache compares the passed quota against the value in the look-aside cache and returns the newer
|
||||
// if the cache is out of date, it deletes the stale entry. This only works because of etcd resourceVersions
|
||||
// being monotonically increasing integers
|
||||
func (e *quotaEvaluator) checkCache(quota *api.ResourceQuota) *api.ResourceQuota {
|
||||
key := quota.Namespace + "/" + quota.Name
|
||||
uncastCachedQuota, ok := e.updatedQuotas.Get(key)
|
||||
if !ok {
|
||||
return quota
|
||||
}
|
||||
cachedQuota := uncastCachedQuota.(*api.ResourceQuota)
|
||||
|
||||
if etcdVersioner.CompareResourceVersion(quota, cachedQuota) >= 0 {
|
||||
e.updatedQuotas.Remove(key)
|
||||
return quota
|
||||
}
|
||||
return cachedQuota
|
||||
}
|
||||
|
||||
func (e *quotaEvaluator) getQuotas(namespace string) ([]api.ResourceQuota, error) {
|
||||
// determine if there are any quotas in this namespace
|
||||
// if there are no quotas, we don't need to do anything
|
||||
items, err := e.indexer.Index("namespace", &api.ResourceQuota{ObjectMeta: api.ObjectMeta{Namespace: namespace, Name: ""}})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Error resolving quota.")
|
||||
}
|
||||
|
||||
// if there are no items held in our indexer, check our live-lookup LRU, if that misses, do the live lookup to prime it.
|
||||
if len(items) == 0 {
|
||||
lruItemObj, ok := e.liveLookupCache.Get(namespace)
|
||||
if !ok || lruItemObj.(liveLookupEntry).expiry.Before(time.Now()) {
|
||||
// TODO: If there are multiple operations at the same time and cache has just expired,
|
||||
// this may cause multiple List operations being issued at the same time.
|
||||
// If there is already in-flight List() for a given namespace, we should wait until
|
||||
// it is finished and cache is updated instead of doing the same, also to avoid
|
||||
// throttling - see #22422 for details.
|
||||
liveList, err := e.client.Core().ResourceQuotas(namespace).List(api.ListOptions{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
newEntry := liveLookupEntry{expiry: time.Now().Add(e.liveTTL)}
|
||||
for i := range liveList.Items {
|
||||
newEntry.items = append(newEntry.items, &liveList.Items[i])
|
||||
}
|
||||
e.liveLookupCache.Add(namespace, newEntry)
|
||||
lruItemObj = newEntry
|
||||
}
|
||||
lruEntry := lruItemObj.(liveLookupEntry)
|
||||
for i := range lruEntry.items {
|
||||
items = append(items, lruEntry.items[i])
|
||||
// hasUsageStats returns true if for each hard constraint there is a value for its current usage
|
||||
func hasUsageStats(resourceQuota *api.ResourceQuota) bool {
|
||||
for resourceName := range resourceQuota.Status.Hard {
|
||||
if _, found := resourceQuota.Status.Used[resourceName]; !found {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
resourceQuotas := []api.ResourceQuota{}
|
||||
for i := range items {
|
||||
quota := items[i].(*api.ResourceQuota)
|
||||
quota = e.checkCache(quota)
|
||||
// always make a copy. We're going to muck around with this and we should never mutate the originals
|
||||
resourceQuotas = append(resourceQuotas, *quota)
|
||||
}
|
||||
|
||||
return resourceQuotas, nil
|
||||
return true
|
||||
}
|
||||
|
179
plugin/pkg/admission/resourcequota/resource_access.go
Normal file
179
plugin/pkg/admission/resourcequota/resource_access.go
Normal file
@ -0,0 +1,179 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package resourcequota
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
lru "github.com/hashicorp/golang-lru"
|
||||
|
||||
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/client/cache"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
"k8s.io/kubernetes/pkg/storage/etcd"
|
||||
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
|
||||
"k8s.io/kubernetes/pkg/watch"
|
||||
)
|
||||
|
||||
// QuotaAccessor abstracts the get/set logic from the rest of the Evaluator. This could be a test stub, a straight passthrough,
|
||||
// or most commonly a series of deconflicting caches.
|
||||
type QuotaAccessor interface {
|
||||
// UpdateQuotaStatus is called to persist final status. This method should write to persistent storage.
|
||||
// An error indicates that write didn't complete successfully.
|
||||
UpdateQuotaStatus(newQuota *api.ResourceQuota) error
|
||||
|
||||
// GetQuotas gets all possible quotas for a given namespace
|
||||
GetQuotas(namespace string) ([]api.ResourceQuota, error)
|
||||
}
|
||||
|
||||
type quotaAccessor struct {
|
||||
client clientset.Interface
|
||||
|
||||
// indexer that holds quota objects by namespace
|
||||
indexer cache.Indexer
|
||||
reflector *cache.Reflector
|
||||
|
||||
// liveLookups holds the last few live lookups we've done to help ammortize cost on repeated lookup failures.
|
||||
// This let's us handle the case of latent caches, by looking up actual results for a namespace on cache miss/no results.
|
||||
// We track the lookup result here so that for repeated requests, we don't look it up very often.
|
||||
liveLookupCache *lru.Cache
|
||||
liveTTL time.Duration
|
||||
// updatedQuotas holds a cache of quotas that we've updated. This is used to pull the "really latest" during back to
|
||||
// back quota evaluations that touch the same quota doc. This only works because we can compare etcd resourceVersions
|
||||
// for the same resource as integers. Before this change: 22 updates with 12 conflicts. after this change: 15 updates with 0 conflicts
|
||||
updatedQuotas *lru.Cache
|
||||
}
|
||||
|
||||
// newQuotaAccessor creates an object that conforms to the QuotaAccessor interface to be used to retrieve quota objects.
|
||||
func newQuotaAccessor(client clientset.Interface) (*quotaAccessor, error) {
|
||||
liveLookupCache, err := lru.New(100)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
updatedCache, err := lru.New(100)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
lw := &cache.ListWatch{
|
||||
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
|
||||
return client.Core().ResourceQuotas(api.NamespaceAll).List(options)
|
||||
},
|
||||
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
|
||||
return client.Core().ResourceQuotas(api.NamespaceAll).Watch(options)
|
||||
},
|
||||
}
|
||||
indexer, reflector := cache.NewNamespaceKeyedIndexerAndReflector(lw, &api.ResourceQuota{}, 0)
|
||||
|
||||
return "aAccessor{
|
||||
client: client,
|
||||
indexer: indexer,
|
||||
reflector: reflector,
|
||||
liveLookupCache: liveLookupCache,
|
||||
liveTTL: time.Duration(30 * time.Second),
|
||||
updatedQuotas: updatedCache,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Run begins watching and syncing.
|
||||
func (e *quotaAccessor) Run(stopCh <-chan struct{}) {
|
||||
defer utilruntime.HandleCrash()
|
||||
|
||||
e.reflector.RunUntil(stopCh)
|
||||
|
||||
<-stopCh
|
||||
glog.Infof("Shutting down quota accessor")
|
||||
}
|
||||
|
||||
func (e *quotaAccessor) UpdateQuotaStatus(newQuota *api.ResourceQuota) error {
|
||||
updatedQuota, err := e.client.Core().ResourceQuotas(newQuota.Namespace).UpdateStatus(newQuota)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
key := newQuota.Namespace + "/" + newQuota.Name
|
||||
e.updatedQuotas.Add(key, updatedQuota)
|
||||
return nil
|
||||
}
|
||||
|
||||
var etcdVersioner = etcd.APIObjectVersioner{}
|
||||
|
||||
// checkCache compares the passed quota against the value in the look-aside cache and returns the newer
|
||||
// if the cache is out of date, it deletes the stale entry. This only works because of etcd resourceVersions
|
||||
// being monotonically increasing integers
|
||||
func (e *quotaAccessor) checkCache(quota *api.ResourceQuota) *api.ResourceQuota {
|
||||
key := quota.Namespace + "/" + quota.Name
|
||||
uncastCachedQuota, ok := e.updatedQuotas.Get(key)
|
||||
if !ok {
|
||||
return quota
|
||||
}
|
||||
cachedQuota := uncastCachedQuota.(*api.ResourceQuota)
|
||||
|
||||
if etcdVersioner.CompareResourceVersion(quota, cachedQuota) >= 0 {
|
||||
e.updatedQuotas.Remove(key)
|
||||
return quota
|
||||
}
|
||||
return cachedQuota
|
||||
}
|
||||
|
||||
func (e *quotaAccessor) GetQuotas(namespace string) ([]api.ResourceQuota, error) {
|
||||
// determine if there are any quotas in this namespace
|
||||
// if there are no quotas, we don't need to do anything
|
||||
items, err := e.indexer.Index("namespace", &api.ResourceQuota{ObjectMeta: api.ObjectMeta{Namespace: namespace, Name: ""}})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Error resolving quota.")
|
||||
}
|
||||
|
||||
// if there are no items held in our indexer, check our live-lookup LRU, if that misses, do the live lookup to prime it.
|
||||
if len(items) == 0 {
|
||||
lruItemObj, ok := e.liveLookupCache.Get(namespace)
|
||||
if !ok || lruItemObj.(liveLookupEntry).expiry.Before(time.Now()) {
|
||||
// TODO: If there are multiple operations at the same time and cache has just expired,
|
||||
// this may cause multiple List operations being issued at the same time.
|
||||
// If there is already in-flight List() for a given namespace, we should wait until
|
||||
// it is finished and cache is updated instead of doing the same, also to avoid
|
||||
// throttling - see #22422 for details.
|
||||
liveList, err := e.client.Core().ResourceQuotas(namespace).List(api.ListOptions{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
newEntry := liveLookupEntry{expiry: time.Now().Add(e.liveTTL)}
|
||||
for i := range liveList.Items {
|
||||
newEntry.items = append(newEntry.items, &liveList.Items[i])
|
||||
}
|
||||
e.liveLookupCache.Add(namespace, newEntry)
|
||||
lruItemObj = newEntry
|
||||
}
|
||||
lruEntry := lruItemObj.(liveLookupEntry)
|
||||
for i := range lruEntry.items {
|
||||
items = append(items, lruEntry.items[i])
|
||||
}
|
||||
}
|
||||
|
||||
resourceQuotas := []api.ResourceQuota{}
|
||||
for i := range items {
|
||||
quota := items[i].(*api.ResourceQuota)
|
||||
quota = e.checkCache(quota)
|
||||
// always make a copy. We're going to muck around with this and we should never mutate the originals
|
||||
resourceQuotas = append(resourceQuotas, *quota)
|
||||
}
|
||||
|
||||
return resourceQuotas, nil
|
||||
}
|
Loading…
Reference in New Issue
Block a user