diff --git a/pkg/kubelet/container/runtime_cache.go b/pkg/kubelet/container/runtime_cache.go index 4f19851b34b..1bbc5651a90 100644 --- a/pkg/kubelet/container/runtime_cache.go +++ b/pkg/kubelet/container/runtime_cache.go @@ -46,6 +46,17 @@ func NewRuntimeCache(getter podsGetter) (RuntimeCache, error) { }, nil } +// runtimeCache caches a list of pods. It records a timestamp (cacheTime) right +// before updating the pods, so the timestamp is at most as new as the pods +// (and can be slightly older). The timestamp always moves forward. Callers are +// expected not to modify the pods returned from GetPods. +// The pod updates can be triggered by a request (e.g., GetPods or +// ForceUpdateIfOlder) if the cached pods are considered stale. These requests +// will be blocked until the cache update is completed. To reduce the cache miss +// penalty, upon a miss, runtimeCache would start a separate goroutine +// (updatingThread) if one does not exist, to periodically updates the cache. +// updatingThread would stop after a period of inactivity (no incoming requests) +// to conserve resources. type runtimeCache struct { sync.Mutex // The underlying container runtime used to update the cache. @@ -60,8 +71,8 @@ type runtimeCache struct { updatingThreadStopTime time.Time } -// GetPods returns the cached result for ListPods if the result is not -// outdated, otherwise it will retrieve the newest result. +// GetPods returns the cached pods if they are not outdated; otherwise, it +// retrieves the latest pods and return them. // If the cache updating loop has stopped, this function will restart it. func (r *runtimeCache) GetPods() ([]*Pod, error) { r.Lock() @@ -90,23 +101,37 @@ func (r *runtimeCache) ForceUpdateIfOlder(minExpectedCacheTime time.Time) error } func (r *runtimeCache) updateCache() error { - pods, err := r.getter.GetPods(false) + pods, timestamp, err := r.getPodsWithTimestamp() if err != nil { return err } - r.pods = pods - r.cacheTime = time.Now() + r.writePodsIfNewer(pods, timestamp) return nil } +// getPodsWithTimestamp records a timestamp and retrieves pods from the getter. +func (r *runtimeCache) getPodsWithTimestamp() ([]*Pod, time.Time, error) { + // Always record the timestamp before getting the pods to avoid stale pods. + timestamp := time.Now() + pods, err := r.getter.GetPods(false) + return pods, timestamp, err +} + +// writePodsIfNewer writes the pods and timestamp if they are newer than the +// cached ones. +func (r *runtimeCache) writePodsIfNewer(pods []*Pod, timestamp time.Time) { + if timestamp.After(r.cacheTime) { + r.pods, r.cacheTime = pods, timestamp + } +} + // startUpdateingCache continues to invoke GetPods to get the newest result until // there is no requests within the default cache period. func (r *runtimeCache) startUpdatingCache() { run := true for run { time.Sleep(defaultUpdateInterval) - pods, err := r.getter.GetPods(false) - cacheTime := time.Now() + pods, timestamp, err := r.getPodsWithTimestamp() if err != nil { continue } @@ -116,8 +141,7 @@ func (r *runtimeCache) startUpdatingCache() { r.updating = false run = false } - r.pods = pods - r.cacheTime = cacheTime + r.writePodsIfNewer(pods, timestamp) r.Unlock() } } diff --git a/pkg/kubelet/container/runtime_cache_test.go b/pkg/kubelet/container/runtime_cache_test.go new file mode 100644 index 00000000000..fffb9e8f567 --- /dev/null +++ b/pkg/kubelet/container/runtime_cache_test.go @@ -0,0 +1,112 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package container + +import ( + "reflect" + "testing" + "time" +) + +// testRunTimeCache embeds runtimeCache with some additional methods for +// testing. +type testRuntimeCache struct { + runtimeCache +} + +func (r *testRuntimeCache) updateCacheWithLock() error { + r.Lock() + defer r.Unlock() + return r.updateCache() +} + +func (r *testRuntimeCache) getCachedPods() []*Pod { + r.Lock() + defer r.Unlock() + return r.pods +} + +func newTestRuntimeCache(getter podsGetter) *testRuntimeCache { + c, _ := NewRuntimeCache(getter) + return &testRuntimeCache{*c.(*runtimeCache)} +} + +func TestGetPods(t *testing.T) { + runtime := &FakeRuntime{} + expected := []*Pod{{ID: "1111"}, {ID: "2222"}, {ID: "3333"}} + runtime.Podlist = expected + cache := newTestRuntimeCache(runtime) + actual, err := cache.GetPods() + if err != nil { + t.Errorf("unexpected error %v", err) + } + if !reflect.DeepEqual(expected, actual) { + t.Errorf("expected %#v, got %#v", expected, actual) + } +} + +func TestForceUpdateIfOlder(t *testing.T) { + runtime := &FakeRuntime{} + cache := newTestRuntimeCache(runtime) + + // Cache old pods. + oldpods := []*Pod{{ID: "1111"}} + runtime.Podlist = oldpods + cache.updateCacheWithLock() + + // Update the runtime to new pods. + newpods := []*Pod{{ID: "1111"}, {ID: "2222"}, {ID: "3333"}} + runtime.Podlist = newpods + + // An older timestamp should not force an update. + cache.ForceUpdateIfOlder(time.Now().Add(-20 * time.Minute)) + actual := cache.getCachedPods() + if !reflect.DeepEqual(oldpods, actual) { + t.Errorf("expected %#v, got %#v", oldpods, actual) + } + + // A newer timestamp should force an update. + cache.ForceUpdateIfOlder(time.Now().Add(20 * time.Second)) + actual = cache.getCachedPods() + if !reflect.DeepEqual(newpods, actual) { + t.Errorf("expected %#v, got %#v", newpods, actual) + } +} + +func TestUpdatePodsOnlyIfNewer(t *testing.T) { + runtime := &FakeRuntime{} + cache := newTestRuntimeCache(runtime) + + // Cache new pods with a future timestamp. + newpods := []*Pod{{ID: "1111"}, {ID: "2222"}, {ID: "3333"}} + cache.Lock() + cache.pods = newpods + cache.cacheTime = time.Now().Add(20 * time.Minute) + cache.Unlock() + + // Instruct runime to return a list of old pods. + oldpods := []*Pod{{ID: "1111"}} + runtime.Podlist = oldpods + + // Try to update the cache; the attempt should not succeed because the + // cache timestamp is newer than the current time. + cache.updateCacheWithLock() + actual := cache.getCachedPods() + if !reflect.DeepEqual(newpods, actual) { + t.Errorf("expected %#v, got %#v", newpods, actual) + } +}