Merge pull request #33464 from liggitt/terminating-namespace-check
Automatic merge from submit-queue Fix cache expiration check The check for whether an entry in the `forceLiveLookup` cache had expired was backwards. Fixed the logic and added tests
This commit is contained in:
commit
e02b73ff67
25
pkg/util/cache/lruexpirecache.go
vendored
25
pkg/util/cache/lruexpirecache.go
vendored
@ -23,13 +23,32 @@ import (
|
||||
"github.com/golang/groupcache/lru"
|
||||
)
|
||||
|
||||
// Clock defines an interface for obtaining the current time
|
||||
type Clock interface {
|
||||
Now() time.Time
|
||||
}
|
||||
|
||||
// realClock implements the Clock interface by calling time.Now()
|
||||
type realClock struct{}
|
||||
|
||||
func (realClock) Now() time.Time { return time.Now() }
|
||||
|
||||
type LRUExpireCache struct {
|
||||
// clock is used to obtain the current time
|
||||
clock Clock
|
||||
|
||||
cache *lru.Cache
|
||||
lock sync.Mutex
|
||||
}
|
||||
|
||||
// NewLRUExpireCache creates an expiring cache with the given size
|
||||
func NewLRUExpireCache(maxSize int) *LRUExpireCache {
|
||||
return &LRUExpireCache{cache: lru.New(maxSize)}
|
||||
return &LRUExpireCache{clock: realClock{}, cache: lru.New(maxSize)}
|
||||
}
|
||||
|
||||
// NewLRUExpireCache creates an expiring cache with the given size, using the specified clock to obtain the current time
|
||||
func NewLRUExpireCacheWithClock(maxSize int, clock Clock) *LRUExpireCache {
|
||||
return &LRUExpireCache{clock: clock, cache: lru.New(maxSize)}
|
||||
}
|
||||
|
||||
type cacheEntry struct {
|
||||
@ -40,7 +59,7 @@ type cacheEntry struct {
|
||||
func (c *LRUExpireCache) Add(key lru.Key, value interface{}, ttl time.Duration) {
|
||||
c.lock.Lock()
|
||||
defer c.lock.Unlock()
|
||||
c.cache.Add(key, &cacheEntry{value, time.Now().Add(ttl)})
|
||||
c.cache.Add(key, &cacheEntry{value, c.clock.Now().Add(ttl)})
|
||||
// Remove entry from cache after ttl.
|
||||
time.AfterFunc(ttl, func() { c.remove(key) })
|
||||
}
|
||||
@ -52,7 +71,7 @@ func (c *LRUExpireCache) Get(key lru.Key) (interface{}, bool) {
|
||||
if !ok {
|
||||
return nil, false
|
||||
}
|
||||
if time.Now().After(e.(*cacheEntry).expireTime) {
|
||||
if c.clock.Now().After(e.(*cacheEntry).expireTime) {
|
||||
go c.remove(key)
|
||||
return nil, false
|
||||
}
|
||||
|
@ -22,7 +22,6 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
lru "github.com/hashicorp/golang-lru"
|
||||
|
||||
"k8s.io/kubernetes/pkg/client/cache"
|
||||
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
||||
@ -31,6 +30,8 @@ import (
|
||||
"k8s.io/kubernetes/pkg/admission"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/errors"
|
||||
utilcache "k8s.io/kubernetes/pkg/util/cache"
|
||||
"k8s.io/kubernetes/pkg/util/clock"
|
||||
"k8s.io/kubernetes/pkg/util/sets"
|
||||
)
|
||||
|
||||
@ -62,7 +63,7 @@ type lifecycle struct {
|
||||
namespaceInformer cache.SharedIndexInformer
|
||||
// forceLiveLookupCache holds a list of entries for namespaces that we have a strong reason to believe are stale in our local cache.
|
||||
// if a namespace is in this cache, then we will ignore our local state and always fetch latest from api server.
|
||||
forceLiveLookupCache *lru.Cache
|
||||
forceLiveLookupCache *utilcache.LRUExpireCache
|
||||
}
|
||||
|
||||
type forceLiveLookupEntry struct {
|
||||
@ -95,10 +96,7 @@ func (l *lifecycle) Admit(a admission.Attributes) error {
|
||||
// is slow to update, we add the namespace into a force live lookup list to ensure
|
||||
// we are not looking at stale state.
|
||||
if a.GetOperation() == admission.Delete {
|
||||
newEntry := forceLiveLookupEntry{
|
||||
expiry: time.Now().Add(forceLiveLookupTTL),
|
||||
}
|
||||
l.forceLiveLookupCache.Add(a.GetName(), newEntry)
|
||||
l.forceLiveLookupCache.Add(a.GetName(), true, forceLiveLookupTTL)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -135,8 +133,7 @@ func (l *lifecycle) Admit(a admission.Attributes) error {
|
||||
|
||||
// forceLiveLookup if true will skip looking at local cache state and instead always make a live call to server.
|
||||
forceLiveLookup := false
|
||||
lruItemObj, ok := l.forceLiveLookupCache.Get(a.GetNamespace())
|
||||
if ok && lruItemObj.(forceLiveLookupEntry).expiry.Before(time.Now()) {
|
||||
if _, ok := l.forceLiveLookupCache.Get(a.GetNamespace()); ok {
|
||||
// we think the namespace was marked for deletion, but our current local cache says otherwise, we will force a live lookup.
|
||||
forceLiveLookup = exists && namespaceObj.(*api.Namespace).Status.Phase == api.NamespaceActive
|
||||
}
|
||||
@ -170,10 +167,11 @@ func (l *lifecycle) Admit(a admission.Attributes) error {
|
||||
|
||||
// NewLifecycle creates a new namespace lifecycle admission control handler
|
||||
func NewLifecycle(c clientset.Interface, immortalNamespaces sets.String) (admission.Interface, error) {
|
||||
forceLiveLookupCache, err := lru.New(100)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
return newLifecycleWithClock(c, immortalNamespaces, clock.RealClock{})
|
||||
}
|
||||
|
||||
func newLifecycleWithClock(c clientset.Interface, immortalNamespaces sets.String, clock utilcache.Clock) (admission.Interface, error) {
|
||||
forceLiveLookupCache := utilcache.NewLRUExpireCacheWithClock(100, clock)
|
||||
return &lifecycle{
|
||||
Handler: admission.NewHandler(admission.Create, admission.Update, admission.Delete),
|
||||
client: c,
|
||||
|
@ -29,14 +29,20 @@ import (
|
||||
"k8s.io/kubernetes/pkg/client/testing/core"
|
||||
"k8s.io/kubernetes/pkg/controller/informers"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
"k8s.io/kubernetes/pkg/util/clock"
|
||||
"k8s.io/kubernetes/pkg/util/sets"
|
||||
"k8s.io/kubernetes/pkg/util/wait"
|
||||
)
|
||||
|
||||
// newHandlerForTest returns a configured handler for testing.
|
||||
func newHandlerForTest(c clientset.Interface) (admission.Interface, informers.SharedInformerFactory, error) {
|
||||
return newHandlerForTestWithClock(c, clock.RealClock{})
|
||||
}
|
||||
|
||||
// newHandlerForTestWithClock returns a configured handler for testing.
|
||||
func newHandlerForTestWithClock(c clientset.Interface, cacheClock clock.Clock) (admission.Interface, informers.SharedInformerFactory, error) {
|
||||
f := informers.NewSharedInformerFactory(c, 5*time.Minute)
|
||||
handler, err := NewLifecycle(c, sets.NewString(api.NamespaceDefault, api.NamespaceSystem))
|
||||
handler, err := newLifecycleWithClock(c, sets.NewString(api.NamespaceDefault, api.NamespaceSystem), cacheClock)
|
||||
if err != nil {
|
||||
return nil, f, err
|
||||
}
|
||||
@ -173,3 +179,80 @@ func TestAdmissionNamespaceTerminating(t *testing.T) {
|
||||
t.Errorf("Did not expect an error %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestAdmissionNamespaceForceLiveLookup verifies live lookups are done after deleting a namespace
|
||||
func TestAdmissionNamespaceForceLiveLookup(t *testing.T) {
|
||||
namespace := "test"
|
||||
getCalls := int64(0)
|
||||
phases := map[string]api.NamespacePhase{namespace: api.NamespaceActive}
|
||||
mockClient := newMockClientForTest(phases)
|
||||
mockClient.AddReactor("get", "namespaces", func(action core.Action) (bool, runtime.Object, error) {
|
||||
getCalls++
|
||||
return true, &api.Namespace{ObjectMeta: api.ObjectMeta{Name: namespace}, Status: api.NamespaceStatus{Phase: phases[namespace]}}, nil
|
||||
})
|
||||
|
||||
fakeClock := clock.NewFakeClock(time.Now())
|
||||
|
||||
handler, informerFactory, err := newHandlerForTestWithClock(mockClient, fakeClock)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error initializing handler: %v", err)
|
||||
}
|
||||
informerFactory.Start(wait.NeverStop)
|
||||
|
||||
pod := newPod(namespace)
|
||||
// verify create operations in the namespace is allowed
|
||||
err = handler.Admit(admission.NewAttributesRecord(&pod, nil, api.Kind("Pod").WithVersion("version"), pod.Namespace, pod.Name, api.Resource("pods").WithVersion("version"), "", admission.Create, nil))
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error rejecting creates in an active namespace")
|
||||
}
|
||||
if getCalls != 0 {
|
||||
t.Errorf("Expected no live lookups of the namespace, got %d", getCalls)
|
||||
}
|
||||
getCalls = 0
|
||||
|
||||
// verify delete of namespace can proceed
|
||||
err = handler.Admit(admission.NewAttributesRecord(nil, nil, api.Kind("Namespace").WithVersion("version"), "", namespace, api.Resource("namespaces").WithVersion("version"), "", admission.Delete, nil))
|
||||
if err != nil {
|
||||
t.Errorf("Expected namespace deletion to be allowed")
|
||||
}
|
||||
if getCalls != 0 {
|
||||
t.Errorf("Expected no live lookups of the namespace, got %d", getCalls)
|
||||
}
|
||||
getCalls = 0
|
||||
|
||||
// simulate the phase changing
|
||||
phases[namespace] = api.NamespaceTerminating
|
||||
|
||||
// verify create operations in the namespace cause an error
|
||||
err = handler.Admit(admission.NewAttributesRecord(&pod, nil, api.Kind("Pod").WithVersion("version"), pod.Namespace, pod.Name, api.Resource("pods").WithVersion("version"), "", admission.Create, nil))
|
||||
if err == nil {
|
||||
t.Errorf("Expected error rejecting creates in a namespace right after deleting it")
|
||||
}
|
||||
if getCalls != 1 {
|
||||
t.Errorf("Expected a live lookup of the namespace at t=0, got %d", getCalls)
|
||||
}
|
||||
getCalls = 0
|
||||
|
||||
// Ensure the live lookup is still forced up to forceLiveLookupTTL
|
||||
fakeClock.Step(forceLiveLookupTTL)
|
||||
|
||||
// verify create operations in the namespace cause an error
|
||||
err = handler.Admit(admission.NewAttributesRecord(&pod, nil, api.Kind("Pod").WithVersion("version"), pod.Namespace, pod.Name, api.Resource("pods").WithVersion("version"), "", admission.Create, nil))
|
||||
if err == nil {
|
||||
t.Errorf("Expected error rejecting creates in a namespace right after deleting it")
|
||||
}
|
||||
if getCalls != 1 {
|
||||
t.Errorf("Expected a live lookup of the namespace at t=forceLiveLookupTTL, got %d", getCalls)
|
||||
}
|
||||
getCalls = 0
|
||||
|
||||
// Ensure the live lookup expires
|
||||
fakeClock.Step(time.Millisecond)
|
||||
|
||||
// verify create operations in the namespace don't force a live lookup after the timeout
|
||||
handler.Admit(admission.NewAttributesRecord(&pod, nil, api.Kind("Pod").WithVersion("version"), pod.Namespace, pod.Name, api.Resource("pods").WithVersion("version"), "", admission.Create, nil))
|
||||
if getCalls != 0 {
|
||||
t.Errorf("Expected no live lookup of the namespace at t=forceLiveLookupTTL+1ms, got %d", getCalls)
|
||||
}
|
||||
getCalls = 0
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user