reduce conflict retries

This commit is contained in:
deads2k
2016-05-02 10:48:06 -04:00
parent 8f104a7b0f
commit 02c0181f26
3 changed files with 86 additions and 3 deletions

View File

@@ -71,4 +71,28 @@ func (a APIObjectVersioner) ObjectResourceVersion(obj runtime.Object) (uint64, e
} }
// APIObjectVersioner implements Versioner // APIObjectVersioner implements Versioner
var _ storage.Versioner = APIObjectVersioner{} var Versioner storage.Versioner = APIObjectVersioner{}
// CompareResourceVersion compares etcd resource versions. Outside this API they are all strings,
// but etcd resource versions are special, they're actually ints, so we can easily compare them.
func (a APIObjectVersioner) CompareResourceVersion(lhs, rhs runtime.Object) int {
lhsVersion, err := Versioner.ObjectResourceVersion(lhs)
if err != nil {
// coder error
panic(err)
}
rhsVersion, err := Versioner.ObjectResourceVersion(rhs)
if err != nil {
// coder error
panic(err)
}
if lhsVersion == rhsVersion {
return 0
}
if lhsVersion < rhsVersion {
return -1
}
return 1
}

View File

@@ -39,3 +39,20 @@ func TestObjectVersioner(t *testing.T) {
t.Errorf("unexpected resource version: %#v", obj) t.Errorf("unexpected resource version: %#v", obj)
} }
} }
func TestCompareResourceVersion(t *testing.T) {
five := &storagetesting.TestResource{ObjectMeta: api.ObjectMeta{ResourceVersion: "5"}}
six := &storagetesting.TestResource{ObjectMeta: api.ObjectMeta{ResourceVersion: "6"}}
versioner := APIObjectVersioner{}
if e, a := -1, versioner.CompareResourceVersion(five, six); e != a {
t.Errorf("expected %v got %v", e, a)
}
if e, a := 1, versioner.CompareResourceVersion(six, five); e != a {
t.Errorf("expected %v got %v", e, a)
}
if e, a := 0, versioner.CompareResourceVersion(six, six); e != a {
t.Errorf("expected %v got %v", e, a)
}
}

View File

@@ -32,6 +32,7 @@ import (
"k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/quota" "k8s.io/kubernetes/pkg/quota"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage/etcd"
utilruntime "k8s.io/kubernetes/pkg/util/runtime" utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/wait"
@@ -52,6 +53,10 @@ type quotaEvaluator struct {
// We track the lookup result here so that for repeated requests, we don't look it up very often. // We track the lookup result here so that for repeated requests, we don't look it up very often.
liveLookupCache *lru.Cache liveLookupCache *lru.Cache
liveTTL time.Duration liveTTL time.Duration
// updatedQuotas holds a cache of quotas that we've updated. This is used to pull the "really latest" during back to
// back quota evaluations that touch the same quota doc. This only works because we can compare etcd resourceVersions
// for the same resource as integers. Before this change: 22 updates with 12 conflicts. after this change: 15 updates with 0 conflicts
updatedQuotas *lru.Cache
// TODO these are used together to bucket items by namespace and then batch them up for processing. // TODO these are used together to bucket items by namespace and then batch them up for processing.
// The technique is valuable for rollup activities to avoid fanout and reduce resource contention. // The technique is valuable for rollup activities to avoid fanout and reduce resource contention.
@@ -101,6 +106,10 @@ func newQuotaEvaluator(client clientset.Interface, registry quota.Registry) (*qu
if err != nil { if err != nil {
return nil, err return nil, err
} }
updatedCache, err := lru.New(100)
if err != nil {
return nil, err
}
lw := &cache.ListWatch{ lw := &cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) { ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return client.Core().ResourceQuotas(api.NamespaceAll).List(options) return client.Core().ResourceQuotas(api.NamespaceAll).List(options)
@@ -118,6 +127,7 @@ func newQuotaEvaluator(client clientset.Interface, registry quota.Registry) (*qu
registry: registry, registry: registry,
liveLookupCache: liveLookupCache, liveLookupCache: liveLookupCache,
liveTTL: time.Duration(30 * time.Second), liveTTL: time.Duration(30 * time.Second),
updatedQuotas: updatedCache,
queue: workqueue.New(), queue: workqueue.New(),
work: map[string][]*admissionWaiter{}, work: map[string][]*admissionWaiter{},
@@ -247,9 +257,14 @@ func (e *quotaEvaluator) checkQuotas(quotas []api.ResourceQuota, admissionAttrib
continue continue
} }
if _, err := e.client.Core().ResourceQuotas(newQuota.Namespace).UpdateStatus(&newQuota); err != nil { if updatedQuota, err := e.client.Core().ResourceQuotas(newQuota.Namespace).UpdateStatus(&newQuota); err != nil {
updatedFailedQuotas = append(updatedFailedQuotas, newQuota) updatedFailedQuotas = append(updatedFailedQuotas, newQuota)
lastErr = err lastErr = err
} else {
// update our cache
e.updateCache(updatedQuota)
} }
} }
@@ -472,6 +487,31 @@ func (e *quotaEvaluator) getWork() (string, []*admissionWaiter, bool) {
return ns, []*admissionWaiter{}, false return ns, []*admissionWaiter{}, false
} }
func (e *quotaEvaluator) updateCache(quota *api.ResourceQuota) {
key := quota.Namespace + "/" + quota.Name
e.updatedQuotas.Add(key, quota)
}
var etcdVersioner = etcd.APIObjectVersioner{}
// checkCache compares the passed quota against the value in the look-aside cache and returns the newer
// if the cache is out of date, it deletes the stale entry. This only works because of etcd resourceVersions
// being monotonically increasing integers
func (e *quotaEvaluator) checkCache(quota *api.ResourceQuota) *api.ResourceQuota {
key := quota.Namespace + "/" + quota.Name
uncastCachedQuota, ok := e.updatedQuotas.Get(key)
if !ok {
return quota
}
cachedQuota := uncastCachedQuota.(*api.ResourceQuota)
if etcdVersioner.CompareResourceVersion(quota, cachedQuota) >= 0 {
e.updatedQuotas.Remove(key)
return quota
}
return cachedQuota
}
func (e *quotaEvaluator) getQuotas(namespace string) ([]api.ResourceQuota, error) { func (e *quotaEvaluator) getQuotas(namespace string) ([]api.ResourceQuota, error) {
// determine if there are any quotas in this namespace // determine if there are any quotas in this namespace
// if there are no quotas, we don't need to do anything // if there are no quotas, we don't need to do anything
@@ -508,8 +548,10 @@ func (e *quotaEvaluator) getQuotas(namespace string) ([]api.ResourceQuota, error
resourceQuotas := []api.ResourceQuota{} resourceQuotas := []api.ResourceQuota{}
for i := range items { for i := range items {
quota := items[i].(*api.ResourceQuota)
quota = e.checkCache(quota)
// always make a copy. We're going to muck around with this and we should never mutate the originals // always make a copy. We're going to muck around with this and we should never mutate the originals
resourceQuotas = append(resourceQuotas, *items[i].(*api.ResourceQuota)) resourceQuotas = append(resourceQuotas, *quota)
} }
return resourceQuotas, nil return resourceQuotas, nil