diff --git a/pkg/util/cache/lruexpirecache.go b/pkg/util/cache/lruexpirecache.go index 99e62cdc672..9c87ba7d3f6 100644 --- a/pkg/util/cache/lruexpirecache.go +++ b/pkg/util/cache/lruexpirecache.go @@ -23,13 +23,32 @@ import ( "github.com/golang/groupcache/lru" ) +// Clock defines an interface for obtaining the current time +type Clock interface { + Now() time.Time +} + +// realClock implements the Clock interface by calling time.Now() +type realClock struct{} + +func (realClock) Now() time.Time { return time.Now() } + type LRUExpireCache struct { + // clock is used to obtain the current time + clock Clock + cache *lru.Cache lock sync.Mutex } +// NewLRUExpireCache creates an expiring cache with the given size func NewLRUExpireCache(maxSize int) *LRUExpireCache { - return &LRUExpireCache{cache: lru.New(maxSize)} + return &LRUExpireCache{clock: realClock{}, cache: lru.New(maxSize)} +} + +// NewLRUExpireCache creates an expiring cache with the given size, using the specified clock to obtain the current time +func NewLRUExpireCacheWithClock(maxSize int, clock Clock) *LRUExpireCache { + return &LRUExpireCache{clock: clock, cache: lru.New(maxSize)} } type cacheEntry struct { @@ -40,7 +59,7 @@ type cacheEntry struct { func (c *LRUExpireCache) Add(key lru.Key, value interface{}, ttl time.Duration) { c.lock.Lock() defer c.lock.Unlock() - c.cache.Add(key, &cacheEntry{value, time.Now().Add(ttl)}) + c.cache.Add(key, &cacheEntry{value, c.clock.Now().Add(ttl)}) // Remove entry from cache after ttl. time.AfterFunc(ttl, func() { c.remove(key) }) } @@ -52,7 +71,7 @@ func (c *LRUExpireCache) Get(key lru.Key) (interface{}, bool) { if !ok { return nil, false } - if time.Now().After(e.(*cacheEntry).expireTime) { + if c.clock.Now().After(e.(*cacheEntry).expireTime) { go c.remove(key) return nil, false } diff --git a/plugin/pkg/admission/namespace/lifecycle/admission.go b/plugin/pkg/admission/namespace/lifecycle/admission.go index a083df8ba93..98f08100814 100644 --- a/plugin/pkg/admission/namespace/lifecycle/admission.go +++ b/plugin/pkg/admission/namespace/lifecycle/admission.go @@ -22,7 +22,6 @@ import ( "time" "github.com/golang/glog" - lru "github.com/hashicorp/golang-lru" "k8s.io/kubernetes/pkg/client/cache" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" @@ -31,6 +30,8 @@ import ( "k8s.io/kubernetes/pkg/admission" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/errors" + utilcache "k8s.io/kubernetes/pkg/util/cache" + "k8s.io/kubernetes/pkg/util/clock" "k8s.io/kubernetes/pkg/util/sets" ) @@ -62,7 +63,7 @@ type lifecycle struct { namespaceInformer cache.SharedIndexInformer // forceLiveLookupCache holds a list of entries for namespaces that we have a strong reason to believe are stale in our local cache. // if a namespace is in this cache, then we will ignore our local state and always fetch latest from api server. - forceLiveLookupCache *lru.Cache + forceLiveLookupCache *utilcache.LRUExpireCache } type forceLiveLookupEntry struct { @@ -95,10 +96,7 @@ func (l *lifecycle) Admit(a admission.Attributes) error { // is slow to update, we add the namespace into a force live lookup list to ensure // we are not looking at stale state. if a.GetOperation() == admission.Delete { - newEntry := forceLiveLookupEntry{ - expiry: time.Now().Add(forceLiveLookupTTL), - } - l.forceLiveLookupCache.Add(a.GetName(), newEntry) + l.forceLiveLookupCache.Add(a.GetName(), true, forceLiveLookupTTL) } return nil } @@ -135,8 +133,7 @@ func (l *lifecycle) Admit(a admission.Attributes) error { // forceLiveLookup if true will skip looking at local cache state and instead always make a live call to server. forceLiveLookup := false - lruItemObj, ok := l.forceLiveLookupCache.Get(a.GetNamespace()) - if ok && lruItemObj.(forceLiveLookupEntry).expiry.Before(time.Now()) { + if _, ok := l.forceLiveLookupCache.Get(a.GetNamespace()); ok { // we think the namespace was marked for deletion, but our current local cache says otherwise, we will force a live lookup. forceLiveLookup = exists && namespaceObj.(*api.Namespace).Status.Phase == api.NamespaceActive } @@ -170,10 +167,11 @@ func (l *lifecycle) Admit(a admission.Attributes) error { // NewLifecycle creates a new namespace lifecycle admission control handler func NewLifecycle(c clientset.Interface, immortalNamespaces sets.String) (admission.Interface, error) { - forceLiveLookupCache, err := lru.New(100) - if err != nil { - panic(err) - } + return newLifecycleWithClock(c, immortalNamespaces, clock.RealClock{}) +} + +func newLifecycleWithClock(c clientset.Interface, immortalNamespaces sets.String, clock utilcache.Clock) (admission.Interface, error) { + forceLiveLookupCache := utilcache.NewLRUExpireCacheWithClock(100, clock) return &lifecycle{ Handler: admission.NewHandler(admission.Create, admission.Update, admission.Delete), client: c, diff --git a/plugin/pkg/admission/namespace/lifecycle/admission_test.go b/plugin/pkg/admission/namespace/lifecycle/admission_test.go index 196e705d4dc..6fd750c1fa5 100644 --- a/plugin/pkg/admission/namespace/lifecycle/admission_test.go +++ b/plugin/pkg/admission/namespace/lifecycle/admission_test.go @@ -29,14 +29,20 @@ import ( "k8s.io/kubernetes/pkg/client/testing/core" "k8s.io/kubernetes/pkg/controller/informers" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/util/clock" "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/wait" ) // newHandlerForTest returns a configured handler for testing. func newHandlerForTest(c clientset.Interface) (admission.Interface, informers.SharedInformerFactory, error) { + return newHandlerForTestWithClock(c, clock.RealClock{}) +} + +// newHandlerForTestWithClock returns a configured handler for testing. +func newHandlerForTestWithClock(c clientset.Interface, cacheClock clock.Clock) (admission.Interface, informers.SharedInformerFactory, error) { f := informers.NewSharedInformerFactory(c, 5*time.Minute) - handler, err := NewLifecycle(c, sets.NewString(api.NamespaceDefault, api.NamespaceSystem)) + handler, err := newLifecycleWithClock(c, sets.NewString(api.NamespaceDefault, api.NamespaceSystem), cacheClock) if err != nil { return nil, f, err } @@ -173,3 +179,80 @@ func TestAdmissionNamespaceTerminating(t *testing.T) { t.Errorf("Did not expect an error %v", err) } } + +// TestAdmissionNamespaceForceLiveLookup verifies live lookups are done after deleting a namespace +func TestAdmissionNamespaceForceLiveLookup(t *testing.T) { + namespace := "test" + getCalls := int64(0) + phases := map[string]api.NamespacePhase{namespace: api.NamespaceActive} + mockClient := newMockClientForTest(phases) + mockClient.AddReactor("get", "namespaces", func(action core.Action) (bool, runtime.Object, error) { + getCalls++ + return true, &api.Namespace{ObjectMeta: api.ObjectMeta{Name: namespace}, Status: api.NamespaceStatus{Phase: phases[namespace]}}, nil + }) + + fakeClock := clock.NewFakeClock(time.Now()) + + handler, informerFactory, err := newHandlerForTestWithClock(mockClient, fakeClock) + if err != nil { + t.Errorf("unexpected error initializing handler: %v", err) + } + informerFactory.Start(wait.NeverStop) + + pod := newPod(namespace) + // verify create operations in the namespace is allowed + err = handler.Admit(admission.NewAttributesRecord(&pod, nil, api.Kind("Pod").WithVersion("version"), pod.Namespace, pod.Name, api.Resource("pods").WithVersion("version"), "", admission.Create, nil)) + if err != nil { + t.Errorf("Unexpected error rejecting creates in an active namespace") + } + if getCalls != 0 { + t.Errorf("Expected no live lookups of the namespace, got %d", getCalls) + } + getCalls = 0 + + // verify delete of namespace can proceed + err = handler.Admit(admission.NewAttributesRecord(nil, nil, api.Kind("Namespace").WithVersion("version"), "", namespace, api.Resource("namespaces").WithVersion("version"), "", admission.Delete, nil)) + if err != nil { + t.Errorf("Expected namespace deletion to be allowed") + } + if getCalls != 0 { + t.Errorf("Expected no live lookups of the namespace, got %d", getCalls) + } + getCalls = 0 + + // simulate the phase changing + phases[namespace] = api.NamespaceTerminating + + // verify create operations in the namespace cause an error + err = handler.Admit(admission.NewAttributesRecord(&pod, nil, api.Kind("Pod").WithVersion("version"), pod.Namespace, pod.Name, api.Resource("pods").WithVersion("version"), "", admission.Create, nil)) + if err == nil { + t.Errorf("Expected error rejecting creates in a namespace right after deleting it") + } + if getCalls != 1 { + t.Errorf("Expected a live lookup of the namespace at t=0, got %d", getCalls) + } + getCalls = 0 + + // Ensure the live lookup is still forced up to forceLiveLookupTTL + fakeClock.Step(forceLiveLookupTTL) + + // verify create operations in the namespace cause an error + err = handler.Admit(admission.NewAttributesRecord(&pod, nil, api.Kind("Pod").WithVersion("version"), pod.Namespace, pod.Name, api.Resource("pods").WithVersion("version"), "", admission.Create, nil)) + if err == nil { + t.Errorf("Expected error rejecting creates in a namespace right after deleting it") + } + if getCalls != 1 { + t.Errorf("Expected a live lookup of the namespace at t=forceLiveLookupTTL, got %d", getCalls) + } + getCalls = 0 + + // Ensure the live lookup expires + fakeClock.Step(time.Millisecond) + + // verify create operations in the namespace don't force a live lookup after the timeout + handler.Admit(admission.NewAttributesRecord(&pod, nil, api.Kind("Pod").WithVersion("version"), pod.Namespace, pod.Name, api.Resource("pods").WithVersion("version"), "", admission.Create, nil)) + if getCalls != 0 { + t.Errorf("Expected no live lookup of the namespace at t=forceLiveLookupTTL+1ms, got %d", getCalls) + } + getCalls = 0 +}