Merge pull request #63963 from wojtek-t/collapse_configmap_manager
Automatic merge from submit-queue (batch tested with PRs 63598, 63913, 63459, 63963, 60464). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>. Refactor ConfigMapManager This is a follow up from https://github.com/kubernetes/kubernetes/pull/63857 to unify SecretManager with ConfigMap manager. This is more-or-less a copy of that PR for ConfigMapManager, with super minor changes in secretManager code.
This commit is contained in:
		| @@ -15,13 +15,12 @@ go_library( | ||||
|     importpath = "k8s.io/kubernetes/pkg/kubelet/configmap", | ||||
|     deps = [ | ||||
|         "//pkg/api/v1/pod:go_default_library", | ||||
|         "//pkg/kubelet/util:go_default_library", | ||||
|         "//pkg/kubelet/util/manager:go_default_library", | ||||
|         "//vendor/k8s.io/api/core/v1:go_default_library", | ||||
|         "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", | ||||
|         "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", | ||||
|         "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", | ||||
|         "//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library", | ||||
|         "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", | ||||
|         "//vendor/k8s.io/apiserver/pkg/storage/etcd:go_default_library", | ||||
|         "//vendor/k8s.io/client-go/kubernetes:go_default_library", | ||||
|     ], | ||||
| ) | ||||
| @@ -44,13 +43,12 @@ go_test( | ||||
|     srcs = ["configmap_manager_test.go"], | ||||
|     embed = [":go_default_library"], | ||||
|     deps = [ | ||||
|         "//vendor/github.com/stretchr/testify/assert:go_default_library", | ||||
|         "//pkg/kubelet/util/manager:go_default_library", | ||||
|         "//vendor/k8s.io/api/core/v1:go_default_library", | ||||
|         "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", | ||||
|         "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", | ||||
|         "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", | ||||
|         "//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library", | ||||
|         "//vendor/k8s.io/client-go/kubernetes:go_default_library", | ||||
|         "//vendor/k8s.io/client-go/kubernetes/fake:go_default_library", | ||||
|         "//vendor/k8s.io/client-go/testing:go_default_library", | ||||
|     ], | ||||
| ) | ||||
|   | ||||
| @@ -18,26 +18,19 @@ package configmap | ||||
|  | ||||
| import ( | ||||
| 	"fmt" | ||||
| 	"strconv" | ||||
| 	"sync" | ||||
| 	"time" | ||||
|  | ||||
| 	"k8s.io/api/core/v1" | ||||
| 	storageetcd "k8s.io/apiserver/pkg/storage/etcd" | ||||
| 	clientset "k8s.io/client-go/kubernetes" | ||||
| 	podutil "k8s.io/kubernetes/pkg/api/v1/pod" | ||||
| 	"k8s.io/kubernetes/pkg/kubelet/util" | ||||
| 	"k8s.io/kubernetes/pkg/kubelet/util/manager" | ||||
|  | ||||
| 	apierrors "k8s.io/apimachinery/pkg/api/errors" | ||||
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||
| 	"k8s.io/apimachinery/pkg/runtime" | ||||
| 	"k8s.io/apimachinery/pkg/util/clock" | ||||
| 	"k8s.io/apimachinery/pkg/util/sets" | ||||
| ) | ||||
|  | ||||
| const ( | ||||
| 	defaultTTL = time.Minute | ||||
| ) | ||||
|  | ||||
| type Manager interface { | ||||
| 	// Get configmap by configmap namespace and name. | ||||
| 	GetConfigMap(namespace, name string) (*v1.ConfigMap, error) | ||||
| @@ -73,191 +66,31 @@ func (s *simpleConfigMapManager) RegisterPod(pod *v1.Pod) { | ||||
| func (s *simpleConfigMapManager) UnregisterPod(pod *v1.Pod) { | ||||
| } | ||||
|  | ||||
| type GetObjectTTLFunc func() (time.Duration, bool) | ||||
|  | ||||
| type objectKey struct { | ||||
| 	namespace string | ||||
| 	name      string | ||||
| // configMapManager keeps a cache of all configmaps necessary | ||||
| // for registered pods. Different implementation of the store | ||||
| // may result in different semantics for freshness of configmaps | ||||
| // (e.g. ttl-based implementation vs watch-based implementation). | ||||
| type configMapManager struct { | ||||
| 	manager manager.Manager | ||||
| } | ||||
|  | ||||
| // configMapStoreItems is a single item stored in configMapStore. | ||||
| type configMapStoreItem struct { | ||||
| 	refCount  int | ||||
| 	configMap *configMapData | ||||
| } | ||||
|  | ||||
| type configMapData struct { | ||||
| 	sync.Mutex | ||||
|  | ||||
| 	configMap      *v1.ConfigMap | ||||
| 	err            error | ||||
| 	lastUpdateTime time.Time | ||||
| } | ||||
|  | ||||
| // configMapStore is a local cache of configmaps. | ||||
| type configMapStore struct { | ||||
| 	kubeClient clientset.Interface | ||||
| 	clock      clock.Clock | ||||
|  | ||||
| 	lock  sync.Mutex | ||||
| 	items map[objectKey]*configMapStoreItem | ||||
|  | ||||
| 	defaultTTL time.Duration | ||||
| 	getTTL     GetObjectTTLFunc | ||||
| } | ||||
|  | ||||
| func newConfigMapStore(kubeClient clientset.Interface, clock clock.Clock, getTTL GetObjectTTLFunc, ttl time.Duration) *configMapStore { | ||||
| 	return &configMapStore{ | ||||
| 		kubeClient: kubeClient, | ||||
| 		clock:      clock, | ||||
| 		items:      make(map[objectKey]*configMapStoreItem), | ||||
| 		defaultTTL: ttl, | ||||
| 		getTTL:     getTTL, | ||||
| func (c *configMapManager) GetConfigMap(namespace, name string) (*v1.ConfigMap, error) { | ||||
| 	object, err := c.manager.GetObject(namespace, name) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func isConfigMapOlder(newConfigMap, oldConfigMap *v1.ConfigMap) bool { | ||||
| 	if newConfigMap == nil || oldConfigMap == nil { | ||||
| 		return false | ||||
| 	if configmap, ok := object.(*v1.ConfigMap); ok { | ||||
| 		return configmap, nil | ||||
| 	} | ||||
| 	newVersion, _ := storageetcd.Versioner.ObjectResourceVersion(newConfigMap) | ||||
| 	oldVersion, _ := storageetcd.Versioner.ObjectResourceVersion(oldConfigMap) | ||||
| 	return newVersion < oldVersion | ||||
| 	return nil, fmt.Errorf("unexpected object type: %v", object) | ||||
| } | ||||
|  | ||||
| func (s *configMapStore) Add(namespace, name string) { | ||||
| 	key := objectKey{namespace: namespace, name: name} | ||||
|  | ||||
| 	// Add is called from RegisterPod, thus it needs to be efficient. | ||||
| 	// Thus Add() is only increasing refCount and generation of a given configmap. | ||||
| 	// Then Get() is responsible for fetching if needed. | ||||
| 	s.lock.Lock() | ||||
| 	defer s.lock.Unlock() | ||||
| 	item, exists := s.items[key] | ||||
| 	if !exists { | ||||
| 		item = &configMapStoreItem{ | ||||
| 			refCount:  0, | ||||
| 			configMap: &configMapData{}, | ||||
| 		} | ||||
| 		s.items[key] = item | ||||
| 	} | ||||
|  | ||||
| 	item.refCount++ | ||||
| 	// This will trigger fetch on the next Get() operation. | ||||
| 	item.configMap = nil | ||||
| func (c *configMapManager) RegisterPod(pod *v1.Pod) { | ||||
| 	c.manager.RegisterPod(pod) | ||||
| } | ||||
|  | ||||
| func (s *configMapStore) Delete(namespace, name string) { | ||||
| 	key := objectKey{namespace: namespace, name: name} | ||||
|  | ||||
| 	s.lock.Lock() | ||||
| 	defer s.lock.Unlock() | ||||
| 	if item, ok := s.items[key]; ok { | ||||
| 		item.refCount-- | ||||
| 		if item.refCount == 0 { | ||||
| 			delete(s.items, key) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func GetObjectTTLFromNodeFunc(getNode func() (*v1.Node, error)) GetObjectTTLFunc { | ||||
| 	return func() (time.Duration, bool) { | ||||
| 		node, err := getNode() | ||||
| 		if err != nil { | ||||
| 			return time.Duration(0), false | ||||
| 		} | ||||
| 		if node != nil && node.Annotations != nil { | ||||
| 			if value, ok := node.Annotations[v1.ObjectTTLAnnotationKey]; ok { | ||||
| 				if intValue, err := strconv.Atoi(value); err == nil { | ||||
| 					return time.Duration(intValue) * time.Second, true | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
| 		return time.Duration(0), false | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (s *configMapStore) isConfigMapFresh(data *configMapData) bool { | ||||
| 	configMapTTL := s.defaultTTL | ||||
| 	if ttl, ok := s.getTTL(); ok { | ||||
| 		configMapTTL = ttl | ||||
| 	} | ||||
| 	return s.clock.Now().Before(data.lastUpdateTime.Add(configMapTTL)) | ||||
| } | ||||
|  | ||||
| func (s *configMapStore) Get(namespace, name string) (*v1.ConfigMap, error) { | ||||
| 	key := objectKey{namespace: namespace, name: name} | ||||
|  | ||||
| 	data := func() *configMapData { | ||||
| 		s.lock.Lock() | ||||
| 		defer s.lock.Unlock() | ||||
| 		item, exists := s.items[key] | ||||
| 		if !exists { | ||||
| 			return nil | ||||
| 		} | ||||
| 		if item.configMap == nil { | ||||
| 			item.configMap = &configMapData{} | ||||
| 		} | ||||
| 		return item.configMap | ||||
| 	}() | ||||
| 	if data == nil { | ||||
| 		return nil, fmt.Errorf("configmap %q/%q not registered", namespace, name) | ||||
| 	} | ||||
|  | ||||
| 	// After updating data in configMapStore, lock the data, fetch configMap if | ||||
| 	// needed and return data. | ||||
| 	data.Lock() | ||||
| 	defer data.Unlock() | ||||
| 	if data.err != nil || !s.isConfigMapFresh(data) { | ||||
| 		opts := metav1.GetOptions{} | ||||
| 		if data.configMap != nil && data.err == nil { | ||||
| 			// This is just a periodic refresh of a configmap we successfully fetched previously. | ||||
| 			// In this case, server data from apiserver cache to reduce the load on both | ||||
| 			// etcd and apiserver (the cache is eventually consistent). | ||||
| 			util.FromApiserverCache(&opts) | ||||
| 		} | ||||
| 		configMap, err := s.kubeClient.CoreV1().ConfigMaps(namespace).Get(name, opts) | ||||
| 		if err != nil && !apierrors.IsNotFound(err) && data.configMap == nil && data.err == nil { | ||||
| 			// Couldn't fetch the latest configmap, but there is no cached data to return. | ||||
| 			// Return the fetch result instead. | ||||
| 			return configMap, err | ||||
| 		} | ||||
| 		if (err == nil && !isConfigMapOlder(configMap, data.configMap)) || apierrors.IsNotFound(err) { | ||||
| 			// If the fetch succeeded with a newer version of the configmap, or if the | ||||
| 			// configmap could not be found in the apiserver, update the cached data to | ||||
| 			// reflect the current status. | ||||
| 			data.configMap = configMap | ||||
| 			data.err = err | ||||
| 			data.lastUpdateTime = s.clock.Now() | ||||
| 		} | ||||
| 	} | ||||
| 	return data.configMap, data.err | ||||
| } | ||||
|  | ||||
| // cachingConfigMapManager keeps a cache of all configmaps necessary for registered pods. | ||||
| // It implements the following logic: | ||||
| // - whenever a pod is created or updated, the cached versions of all its configmaps | ||||
| //   are invalidated | ||||
| // - every GetConfigMap() call tries to fetch the value from local cache; if it is | ||||
| //   not there, invalidated or too old, we fetch it from apiserver and refresh the | ||||
| //   value in cache; otherwise it is just fetched from cache | ||||
| type cachingConfigMapManager struct { | ||||
| 	configMapStore *configMapStore | ||||
|  | ||||
| 	lock           sync.Mutex | ||||
| 	registeredPods map[objectKey]*v1.Pod | ||||
| } | ||||
|  | ||||
| func NewCachingConfigMapManager(kubeClient clientset.Interface, getTTL GetObjectTTLFunc) Manager { | ||||
| 	csm := &cachingConfigMapManager{ | ||||
| 		configMapStore: newConfigMapStore(kubeClient, clock.RealClock{}, getTTL, defaultTTL), | ||||
| 		registeredPods: make(map[objectKey]*v1.Pod), | ||||
| 	} | ||||
| 	return csm | ||||
| } | ||||
|  | ||||
| func (c *cachingConfigMapManager) GetConfigMap(namespace, name string) (*v1.ConfigMap, error) { | ||||
| 	return c.configMapStore.Get(namespace, name) | ||||
| func (c *configMapManager) UnregisterPod(pod *v1.Pod) { | ||||
| 	c.manager.UnregisterPod(pod) | ||||
| } | ||||
|  | ||||
| func getConfigMapNames(pod *v1.Pod) sets.String { | ||||
| @@ -269,39 +102,24 @@ func getConfigMapNames(pod *v1.Pod) sets.String { | ||||
| 	return result | ||||
| } | ||||
|  | ||||
| func (c *cachingConfigMapManager) RegisterPod(pod *v1.Pod) { | ||||
| 	names := getConfigMapNames(pod) | ||||
| 	c.lock.Lock() | ||||
| 	defer c.lock.Unlock() | ||||
| 	for name := range names { | ||||
| 		c.configMapStore.Add(pod.Namespace, name) | ||||
| 	} | ||||
| 	var prev *v1.Pod | ||||
| 	key := objectKey{namespace: pod.Namespace, name: pod.Name} | ||||
| 	prev = c.registeredPods[key] | ||||
| 	c.registeredPods[key] = pod | ||||
| 	if prev != nil { | ||||
| 		for name := range getConfigMapNames(prev) { | ||||
| 			// On an update, the .Add() call above will have re-incremented the | ||||
| 			// ref count of any existing items, so any configmaps that are in both | ||||
| 			// names and prev need to have their ref counts decremented. Any that | ||||
| 			// are only in prev need to be completely removed. This unconditional | ||||
| 			// call takes care of both cases. | ||||
| 			c.configMapStore.Delete(prev.Namespace, name) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
| const ( | ||||
| 	defaultTTL = time.Minute | ||||
| ) | ||||
|  | ||||
| func (c *cachingConfigMapManager) UnregisterPod(pod *v1.Pod) { | ||||
| 	var prev *v1.Pod | ||||
| 	key := objectKey{namespace: pod.Namespace, name: pod.Name} | ||||
| 	c.lock.Lock() | ||||
| 	defer c.lock.Unlock() | ||||
| 	prev = c.registeredPods[key] | ||||
| 	delete(c.registeredPods, key) | ||||
| 	if prev != nil { | ||||
| 		for name := range getConfigMapNames(prev) { | ||||
| 			c.configMapStore.Delete(prev.Namespace, name) | ||||
| 		} | ||||
| // NewCachingConfigMapManager creates a manager that keeps a cache of all configmaps | ||||
| // necessary for registered pods. | ||||
| // It implement the following logic: | ||||
| // - whenever a pod is create or updated, the cached versions of all configmaps | ||||
| //   are invalidated | ||||
| // - every GetObject() call tries to fetch the value from local cache; if it is | ||||
| //   not there, invalidated or too old, we fetch it from apiserver and refresh the | ||||
| //   value in cache; otherwise it is just fetched from cache | ||||
| func NewCachingConfigMapManager(kubeClient clientset.Interface, getTTL manager.GetObjectTTLFunc) Manager { | ||||
| 	getConfigMap := func(namespace, name string, opts metav1.GetOptions) (runtime.Object, error) { | ||||
| 		return kubeClient.CoreV1().ConfigMaps(namespace).Get(name, opts) | ||||
| 	} | ||||
| 	configMapStore := manager.NewObjectStore(getConfigMap, clock.RealClock{}, getTTL, defaultTTL) | ||||
| 	return &configMapManager{ | ||||
| 		manager: manager.NewCacheBasedManager(configMapStore, getConfigMapNames), | ||||
| 	} | ||||
| } | ||||
|   | ||||
| @@ -18,30 +18,27 @@ package configmap | ||||
|  | ||||
| import ( | ||||
| 	"fmt" | ||||
| 	"reflect" | ||||
| 	"strings" | ||||
| 	"sync" | ||||
| 	"testing" | ||||
| 	"time" | ||||
|  | ||||
| 	"k8s.io/api/core/v1" | ||||
| 	"k8s.io/client-go/kubernetes/fake" | ||||
|  | ||||
| 	apierrors "k8s.io/apimachinery/pkg/api/errors" | ||||
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||
| 	"k8s.io/apimachinery/pkg/runtime" | ||||
| 	"k8s.io/apimachinery/pkg/util/clock" | ||||
| 	core "k8s.io/client-go/testing" | ||||
|  | ||||
| 	"github.com/stretchr/testify/assert" | ||||
| 	clientset "k8s.io/client-go/kubernetes" | ||||
| 	"k8s.io/client-go/kubernetes/fake" | ||||
| 	"k8s.io/kubernetes/pkg/kubelet/util/manager" | ||||
| ) | ||||
|  | ||||
| func checkConfigMap(t *testing.T, store *configMapStore, ns, name string, shouldExist bool) { | ||||
| func checkObject(t *testing.T, store manager.Store, ns, name string, shouldExist bool) { | ||||
| 	_, err := store.Get(ns, name) | ||||
| 	if shouldExist && err != nil { | ||||
| 		t.Errorf("unexpected actions: %#v", err) | ||||
| 	} | ||||
| 	if !shouldExist && (err == nil || !strings.Contains(err.Error(), fmt.Sprintf("configmap %q/%q not registered", ns, name))) { | ||||
| 	if !shouldExist && (err == nil || !strings.Contains(err.Error(), fmt.Sprintf("object %q/%q not registered", ns, name))) { | ||||
| 		t.Errorf("unexpected actions: %#v", err) | ||||
| 	} | ||||
| } | ||||
| @@ -50,242 +47,9 @@ func noObjectTTL() (time.Duration, bool) { | ||||
| 	return time.Duration(0), false | ||||
| } | ||||
|  | ||||
| func TestConfigMapStore(t *testing.T) { | ||||
| 	fakeClient := &fake.Clientset{} | ||||
| 	store := newConfigMapStore(fakeClient, clock.RealClock{}, noObjectTTL, 0) | ||||
| 	store.Add("ns1", "name1") | ||||
| 	store.Add("ns2", "name2") | ||||
| 	store.Add("ns1", "name1") | ||||
| 	store.Add("ns1", "name1") | ||||
| 	store.Delete("ns1", "name1") | ||||
| 	store.Delete("ns2", "name2") | ||||
| 	store.Add("ns3", "name3") | ||||
|  | ||||
| 	// Adds don't issue Get requests. | ||||
| 	actions := fakeClient.Actions() | ||||
| 	assert.Equal(t, 0, len(actions), "unexpected actions: %#v", actions) | ||||
| 	// Should issue Get request | ||||
| 	store.Get("ns1", "name1") | ||||
| 	// Shouldn't issue Get request, as configMap is not registered | ||||
| 	store.Get("ns2", "name2") | ||||
| 	// Should issue Get request | ||||
| 	store.Get("ns3", "name3") | ||||
|  | ||||
| 	actions = fakeClient.Actions() | ||||
| 	assert.Equal(t, 2, len(actions), "unexpected actions: %#v", actions) | ||||
|  | ||||
| 	for _, a := range actions { | ||||
| 		assert.True(t, a.Matches("get", "configmaps"), "unexpected actions: %#v", a) | ||||
| 	} | ||||
|  | ||||
| 	checkConfigMap(t, store, "ns1", "name1", true) | ||||
| 	checkConfigMap(t, store, "ns2", "name2", false) | ||||
| 	checkConfigMap(t, store, "ns3", "name3", true) | ||||
| 	checkConfigMap(t, store, "ns4", "name4", false) | ||||
| } | ||||
|  | ||||
| func TestConfigMapStoreDeletingConfigMap(t *testing.T) { | ||||
| 	fakeClient := &fake.Clientset{} | ||||
| 	store := newConfigMapStore(fakeClient, clock.RealClock{}, noObjectTTL, 0) | ||||
| 	store.Add("ns", "name") | ||||
|  | ||||
| 	result := &v1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "name", ResourceVersion: "10"}} | ||||
| 	fakeClient.AddReactor("get", "configmaps", func(action core.Action) (bool, runtime.Object, error) { | ||||
| 		return true, result, nil | ||||
| 	}) | ||||
| 	configMap, err := store.Get("ns", "name") | ||||
| 	if err != nil { | ||||
| 		t.Errorf("Unexpected error: %v", err) | ||||
| 	} | ||||
| 	if !reflect.DeepEqual(configMap, result) { | ||||
| 		t.Errorf("Unexpected configMap: %v", configMap) | ||||
| 	} | ||||
|  | ||||
| 	fakeClient.PrependReactor("get", "configmaps", func(action core.Action) (bool, runtime.Object, error) { | ||||
| 		return true, &v1.ConfigMap{}, apierrors.NewNotFound(v1.Resource("configMap"), "name") | ||||
| 	}) | ||||
| 	configMap, err = store.Get("ns", "name") | ||||
| 	if err == nil || !apierrors.IsNotFound(err) { | ||||
| 		t.Errorf("Unexpected error: %v", err) | ||||
| 	} | ||||
| 	if !reflect.DeepEqual(configMap, &v1.ConfigMap{}) { | ||||
| 		t.Errorf("Unexpected configMap: %v", configMap) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestConfigMapStoreGetAlwaysRefresh(t *testing.T) { | ||||
| 	fakeClient := &fake.Clientset{} | ||||
| 	fakeClock := clock.NewFakeClock(time.Now()) | ||||
| 	store := newConfigMapStore(fakeClient, fakeClock, noObjectTTL, 0) | ||||
|  | ||||
| 	for i := 0; i < 10; i++ { | ||||
| 		store.Add(fmt.Sprintf("ns-%d", i), fmt.Sprintf("name-%d", i)) | ||||
| 	} | ||||
| 	fakeClient.ClearActions() | ||||
|  | ||||
| 	wg := sync.WaitGroup{} | ||||
| 	wg.Add(100) | ||||
| 	for i := 0; i < 100; i++ { | ||||
| 		go func(i int) { | ||||
| 			store.Get(fmt.Sprintf("ns-%d", i%10), fmt.Sprintf("name-%d", i%10)) | ||||
| 			wg.Done() | ||||
| 		}(i) | ||||
| 	} | ||||
| 	wg.Wait() | ||||
| 	actions := fakeClient.Actions() | ||||
| 	assert.Equal(t, 100, len(actions), "unexpected actions: %#v", actions) | ||||
|  | ||||
| 	for _, a := range actions { | ||||
| 		assert.True(t, a.Matches("get", "configmaps"), "unexpected actions: %#v", a) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestConfigMapStoreGetNeverRefresh(t *testing.T) { | ||||
| 	fakeClient := &fake.Clientset{} | ||||
| 	fakeClock := clock.NewFakeClock(time.Now()) | ||||
| 	store := newConfigMapStore(fakeClient, fakeClock, noObjectTTL, time.Minute) | ||||
|  | ||||
| 	for i := 0; i < 10; i++ { | ||||
| 		store.Add(fmt.Sprintf("ns-%d", i), fmt.Sprintf("name-%d", i)) | ||||
| 	} | ||||
| 	fakeClient.ClearActions() | ||||
|  | ||||
| 	wg := sync.WaitGroup{} | ||||
| 	wg.Add(100) | ||||
| 	for i := 0; i < 100; i++ { | ||||
| 		go func(i int) { | ||||
| 			store.Get(fmt.Sprintf("ns-%d", i%10), fmt.Sprintf("name-%d", i%10)) | ||||
| 			wg.Done() | ||||
| 		}(i) | ||||
| 	} | ||||
| 	wg.Wait() | ||||
| 	actions := fakeClient.Actions() | ||||
| 	// Only first Get, should forward the Get request. | ||||
| 	assert.Equal(t, 10, len(actions), "unexpected actions: %#v", actions) | ||||
| } | ||||
|  | ||||
| func TestCustomTTL(t *testing.T) { | ||||
| 	ttl := time.Duration(0) | ||||
| 	ttlExists := false | ||||
| 	customTTL := func() (time.Duration, bool) { | ||||
| 		return ttl, ttlExists | ||||
| 	} | ||||
|  | ||||
| 	fakeClient := &fake.Clientset{} | ||||
| 	fakeClock := clock.NewFakeClock(time.Time{}) | ||||
| 	store := newConfigMapStore(fakeClient, fakeClock, customTTL, time.Minute) | ||||
|  | ||||
| 	store.Add("ns", "name") | ||||
| 	store.Get("ns", "name") | ||||
| 	fakeClient.ClearActions() | ||||
|  | ||||
| 	// Set 0-ttl and see if that works. | ||||
| 	ttl = time.Duration(0) | ||||
| 	ttlExists = true | ||||
| 	store.Get("ns", "name") | ||||
| 	actions := fakeClient.Actions() | ||||
| 	assert.Equal(t, 1, len(actions), "unexpected actions: %#v", actions) | ||||
| 	fakeClient.ClearActions() | ||||
|  | ||||
| 	// Set 5-minute ttl and see if this works. | ||||
| 	ttl = time.Duration(5) * time.Minute | ||||
| 	store.Get("ns", "name") | ||||
| 	actions = fakeClient.Actions() | ||||
| 	assert.Equal(t, 0, len(actions), "unexpected actions: %#v", actions) | ||||
| 	// Still no effect after 4 minutes. | ||||
| 	fakeClock.Step(4 * time.Minute) | ||||
| 	store.Get("ns", "name") | ||||
| 	actions = fakeClient.Actions() | ||||
| 	assert.Equal(t, 0, len(actions), "unexpected actions: %#v", actions) | ||||
| 	// Now it should have an effect. | ||||
| 	fakeClock.Step(time.Minute) | ||||
| 	store.Get("ns", "name") | ||||
| 	actions = fakeClient.Actions() | ||||
| 	assert.Equal(t, 1, len(actions), "unexpected actions: %#v", actions) | ||||
| 	fakeClient.ClearActions() | ||||
|  | ||||
| 	// Now remove the custom ttl and see if that works. | ||||
| 	ttlExists = false | ||||
| 	fakeClock.Step(55 * time.Second) | ||||
| 	store.Get("ns", "name") | ||||
| 	actions = fakeClient.Actions() | ||||
| 	assert.Equal(t, 0, len(actions), "unexpected actions: %#v", actions) | ||||
| 	// Pass the minute and it should be triggered now. | ||||
| 	fakeClock.Step(5 * time.Second) | ||||
| 	store.Get("ns", "name") | ||||
| 	actions = fakeClient.Actions() | ||||
| 	assert.Equal(t, 1, len(actions), "unexpected actions: %#v", actions) | ||||
| } | ||||
|  | ||||
| func TestParseNodeAnnotation(t *testing.T) { | ||||
| 	testCases := []struct { | ||||
| 		node   *v1.Node | ||||
| 		err    error | ||||
| 		exists bool | ||||
| 		ttl    time.Duration | ||||
| 	}{ | ||||
| 		{ | ||||
| 			node:   nil, | ||||
| 			err:    fmt.Errorf("error"), | ||||
| 			exists: false, | ||||
| 		}, | ||||
| 		{ | ||||
| 			node: &v1.Node{ | ||||
| 				ObjectMeta: metav1.ObjectMeta{ | ||||
| 					Name: "node", | ||||
| 				}, | ||||
| 			}, | ||||
| 			exists: false, | ||||
| 		}, | ||||
| 		{ | ||||
| 			node: &v1.Node{ | ||||
| 				ObjectMeta: metav1.ObjectMeta{ | ||||
| 					Name:        "node", | ||||
| 					Annotations: map[string]string{}, | ||||
| 				}, | ||||
| 			}, | ||||
| 			exists: false, | ||||
| 		}, | ||||
| 		{ | ||||
| 			node: &v1.Node{ | ||||
| 				ObjectMeta: metav1.ObjectMeta{ | ||||
| 					Name:        "node", | ||||
| 					Annotations: map[string]string{v1.ObjectTTLAnnotationKey: "bad"}, | ||||
| 				}, | ||||
| 			}, | ||||
| 			exists: false, | ||||
| 		}, | ||||
| 		{ | ||||
| 			node: &v1.Node{ | ||||
| 				ObjectMeta: metav1.ObjectMeta{ | ||||
| 					Name:        "node", | ||||
| 					Annotations: map[string]string{v1.ObjectTTLAnnotationKey: "0"}, | ||||
| 				}, | ||||
| 			}, | ||||
| 			exists: true, | ||||
| 			ttl:    time.Duration(0), | ||||
| 		}, | ||||
| 		{ | ||||
| 			node: &v1.Node{ | ||||
| 				ObjectMeta: metav1.ObjectMeta{ | ||||
| 					Name:        "node", | ||||
| 					Annotations: map[string]string{v1.ObjectTTLAnnotationKey: "60"}, | ||||
| 				}, | ||||
| 			}, | ||||
| 			exists: true, | ||||
| 			ttl:    time.Minute, | ||||
| 		}, | ||||
| 	} | ||||
| 	for i, testCase := range testCases { | ||||
| 		getNode := func() (*v1.Node, error) { return testCase.node, testCase.err } | ||||
| 		ttl, exists := GetObjectTTLFromNodeFunc(getNode)() | ||||
| 		if exists != testCase.exists { | ||||
| 			t.Errorf("%d: incorrect parsing: %t", i, exists) | ||||
| 			continue | ||||
| 		} | ||||
| 		if exists && ttl != testCase.ttl { | ||||
| 			t.Errorf("%d: incorrect ttl: %v", i, ttl) | ||||
| 		} | ||||
| func getConfigMap(fakeClient clientset.Interface) manager.GetObjectFunc { | ||||
| 	return func(namespace, name string, opts metav1.GetOptions) (runtime.Object, error) { | ||||
| 		return fakeClient.CoreV1().ConfigMaps(namespace).Get(name, opts) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| @@ -348,152 +112,11 @@ func podWithConfigMaps(ns, podName string, toAttach configMapsToAttach) *v1.Pod | ||||
| 	return pod | ||||
| } | ||||
|  | ||||
| func TestCacheInvalidation(t *testing.T) { | ||||
| func TestCacheBasedConfigMapManager(t *testing.T) { | ||||
| 	fakeClient := &fake.Clientset{} | ||||
| 	fakeClock := clock.NewFakeClock(time.Now()) | ||||
| 	store := newConfigMapStore(fakeClient, fakeClock, noObjectTTL, time.Minute) | ||||
| 	manager := &cachingConfigMapManager{ | ||||
| 		configMapStore: store, | ||||
| 		registeredPods: make(map[objectKey]*v1.Pod), | ||||
| 	} | ||||
|  | ||||
| 	// Create a pod with some configMaps. | ||||
| 	s1 := configMapsToAttach{ | ||||
| 		containerEnvConfigMaps: []envConfigMaps{ | ||||
| 			{envVarNames: []string{"s1"}, envFromNames: []string{"s10"}}, | ||||
| 			{envVarNames: []string{"s2"}}, | ||||
| 		}, | ||||
| 	} | ||||
| 	manager.RegisterPod(podWithConfigMaps("ns1", "name1", s1)) | ||||
| 	// Fetch both configMaps - this should triggger get operations. | ||||
| 	store.Get("ns1", "s1") | ||||
| 	store.Get("ns1", "s10") | ||||
| 	store.Get("ns1", "s2") | ||||
| 	actions := fakeClient.Actions() | ||||
| 	assert.Equal(t, 3, len(actions), "unexpected actions: %#v", actions) | ||||
| 	fakeClient.ClearActions() | ||||
|  | ||||
| 	// Update a pod with a new configMap. | ||||
| 	s2 := configMapsToAttach{ | ||||
| 		containerEnvConfigMaps: []envConfigMaps{ | ||||
| 			{envVarNames: []string{"s1"}}, | ||||
| 			{envVarNames: []string{"s2"}, envFromNames: []string{"s20"}}, | ||||
| 		}, | ||||
| 		volumes: []string{"s3"}, | ||||
| 	} | ||||
| 	manager.RegisterPod(podWithConfigMaps("ns1", "name1", s2)) | ||||
| 	// All configMaps should be invalidated - this should trigger get operations. | ||||
| 	store.Get("ns1", "s1") | ||||
| 	store.Get("ns1", "s2") | ||||
| 	store.Get("ns1", "s20") | ||||
| 	store.Get("ns1", "s3") | ||||
| 	actions = fakeClient.Actions() | ||||
| 	assert.Equal(t, 4, len(actions), "unexpected actions: %#v", actions) | ||||
| 	fakeClient.ClearActions() | ||||
|  | ||||
| 	// Create a new pod that is refencing the first three configMaps - those should | ||||
| 	// be invalidated. | ||||
| 	manager.RegisterPod(podWithConfigMaps("ns1", "name2", s1)) | ||||
| 	store.Get("ns1", "s1") | ||||
| 	store.Get("ns1", "s10") | ||||
| 	store.Get("ns1", "s2") | ||||
| 	store.Get("ns1", "s20") | ||||
| 	store.Get("ns1", "s3") | ||||
| 	actions = fakeClient.Actions() | ||||
| 	assert.Equal(t, 3, len(actions), "unexpected actions: %#v", actions) | ||||
| 	fakeClient.ClearActions() | ||||
| } | ||||
|  | ||||
| func TestCacheRefcounts(t *testing.T) { | ||||
| 	fakeClient := &fake.Clientset{} | ||||
| 	fakeClock := clock.NewFakeClock(time.Now()) | ||||
| 	store := newConfigMapStore(fakeClient, fakeClock, noObjectTTL, time.Minute) | ||||
| 	manager := &cachingConfigMapManager{ | ||||
| 		configMapStore: store, | ||||
| 		registeredPods: make(map[objectKey]*v1.Pod), | ||||
| 	} | ||||
|  | ||||
| 	s1 := configMapsToAttach{ | ||||
| 		containerEnvConfigMaps: []envConfigMaps{ | ||||
| 			{envVarNames: []string{"s1"}, envFromNames: []string{"s10"}}, | ||||
| 			{envVarNames: []string{"s2"}}, | ||||
| 		}, | ||||
| 		volumes: []string{"s3"}, | ||||
| 	} | ||||
| 	manager.RegisterPod(podWithConfigMaps("ns1", "name1", s1)) | ||||
| 	manager.RegisterPod(podWithConfigMaps("ns1", "name2", s1)) | ||||
| 	s2 := configMapsToAttach{ | ||||
| 		containerEnvConfigMaps: []envConfigMaps{ | ||||
| 			{envVarNames: []string{"s4"}}, | ||||
| 			{envVarNames: []string{"s5"}, envFromNames: []string{"s50"}}, | ||||
| 		}, | ||||
| 	} | ||||
| 	manager.RegisterPod(podWithConfigMaps("ns1", "name2", s2)) | ||||
| 	manager.RegisterPod(podWithConfigMaps("ns1", "name3", s2)) | ||||
| 	manager.RegisterPod(podWithConfigMaps("ns1", "name4", s2)) | ||||
| 	manager.UnregisterPod(podWithConfigMaps("ns1", "name3", s2)) | ||||
| 	s3 := configMapsToAttach{ | ||||
| 		containerEnvConfigMaps: []envConfigMaps{ | ||||
| 			{envVarNames: []string{"s3"}, envFromNames: []string{"s30"}}, | ||||
| 			{envVarNames: []string{"s5"}}, | ||||
| 		}, | ||||
| 	} | ||||
| 	manager.RegisterPod(podWithConfigMaps("ns1", "name5", s3)) | ||||
| 	manager.RegisterPod(podWithConfigMaps("ns1", "name6", s3)) | ||||
| 	s4 := configMapsToAttach{ | ||||
| 		containerEnvConfigMaps: []envConfigMaps{ | ||||
| 			{envVarNames: []string{"s6"}}, | ||||
| 			{envFromNames: []string{"s60"}}, | ||||
| 		}, | ||||
| 	} | ||||
| 	manager.RegisterPod(podWithConfigMaps("ns1", "name7", s4)) | ||||
| 	manager.UnregisterPod(podWithConfigMaps("ns1", "name7", s4)) | ||||
|  | ||||
| 	// Also check the Add + Update + Remove scenario. | ||||
| 	manager.RegisterPod(podWithConfigMaps("ns1", "other-name", s1)) | ||||
| 	manager.RegisterPod(podWithConfigMaps("ns1", "other-name", s2)) | ||||
| 	manager.UnregisterPod(podWithConfigMaps("ns1", "other-name", s2)) | ||||
|  | ||||
| 	s5 := configMapsToAttach{ | ||||
| 		containerEnvConfigMaps: []envConfigMaps{ | ||||
| 			{envVarNames: []string{"s7"}}, | ||||
| 			{envFromNames: []string{"s70"}}, | ||||
| 		}, | ||||
| 	} | ||||
|  | ||||
| 	// Check the no-op update scenario | ||||
| 	manager.RegisterPod(podWithConfigMaps("ns1", "noop-pod", s5)) | ||||
| 	manager.RegisterPod(podWithConfigMaps("ns1", "noop-pod", s5)) | ||||
|  | ||||
| 	refs := func(ns, name string) int { | ||||
| 		store.lock.Lock() | ||||
| 		defer store.lock.Unlock() | ||||
| 		item, ok := store.items[objectKey{ns, name}] | ||||
| 		if !ok { | ||||
| 			return 0 | ||||
| 		} | ||||
| 		return item.refCount | ||||
| 	} | ||||
| 	assert.Equal(t, 1, refs("ns1", "s1")) | ||||
| 	assert.Equal(t, 1, refs("ns1", "s10")) | ||||
| 	assert.Equal(t, 1, refs("ns1", "s2")) | ||||
| 	assert.Equal(t, 3, refs("ns1", "s3")) | ||||
| 	assert.Equal(t, 2, refs("ns1", "s30")) | ||||
| 	assert.Equal(t, 2, refs("ns1", "s4")) | ||||
| 	assert.Equal(t, 4, refs("ns1", "s5")) | ||||
| 	assert.Equal(t, 2, refs("ns1", "s50")) | ||||
| 	assert.Equal(t, 0, refs("ns1", "s6")) | ||||
| 	assert.Equal(t, 0, refs("ns1", "s60")) | ||||
| 	assert.Equal(t, 1, refs("ns1", "s7")) | ||||
| 	assert.Equal(t, 1, refs("ns1", "s70")) | ||||
| } | ||||
|  | ||||
| func TestCachingConfigMapManager(t *testing.T) { | ||||
| 	fakeClient := &fake.Clientset{} | ||||
| 	configMapStore := newConfigMapStore(fakeClient, clock.RealClock{}, noObjectTTL, 0) | ||||
| 	manager := &cachingConfigMapManager{ | ||||
| 		configMapStore: configMapStore, | ||||
| 		registeredPods: make(map[objectKey]*v1.Pod), | ||||
| 	store := manager.NewObjectStore(getConfigMap(fakeClient), clock.RealClock{}, noObjectTTL, 0) | ||||
| 	manager := &configMapManager{ | ||||
| 		manager: manager.NewCacheBasedManager(store, getConfigMapNames), | ||||
| 	} | ||||
|  | ||||
| 	// Create a pod with some configMaps. | ||||
| @@ -543,7 +166,7 @@ func TestCachingConfigMapManager(t *testing.T) { | ||||
|  | ||||
| 	for _, ns := range []string{"ns1", "ns2", "ns3"} { | ||||
| 		for _, configMap := range []string{"s1", "s2", "s3", "s4", "s5", "s6", "s20", "s40", "s50"} { | ||||
| 			checkConfigMap(t, configMapStore, ns, configMap, shouldExist(ns, configMap)) | ||||
| 			checkObject(t, store, ns, configMap, shouldExist(ns, configMap)) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|   | ||||
| @@ -547,7 +547,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, | ||||
| 	klet.secretManager = secretManager | ||||
|  | ||||
| 	configMapManager := configmap.NewCachingConfigMapManager( | ||||
| 		kubeDeps.KubeClient, configmap.GetObjectTTLFromNodeFunc(klet.GetNode)) | ||||
| 		kubeDeps.KubeClient, manager.GetObjectTTLFromNodeFunc(klet.GetNode)) | ||||
| 	klet.configMapManager = configMapManager | ||||
|  | ||||
| 	if klet.experimentalHostUserNamespaceDefaulting { | ||||
|   | ||||
| @@ -66,16 +66,16 @@ func (s *simpleSecretManager) RegisterPod(pod *v1.Pod) { | ||||
| func (s *simpleSecretManager) UnregisterPod(pod *v1.Pod) { | ||||
| } | ||||
|  | ||||
| // cachingSecretManager keeps a store with secrets necessary | ||||
| // secretManager keeps a store with secrets necessary | ||||
| // for registered pods. Different implementations of the store | ||||
| // may result in different semantics for freshness of secrets | ||||
| // (e.g. ttl-based implementation vs watch-based implementation). | ||||
| type cachingSecretManager struct { | ||||
| type secretManager struct { | ||||
| 	manager manager.Manager | ||||
| } | ||||
|  | ||||
| func (c *cachingSecretManager) GetSecret(namespace, name string) (*v1.Secret, error) { | ||||
| 	object, err := c.manager.GetObject(namespace, name) | ||||
| func (s *secretManager) GetSecret(namespace, name string) (*v1.Secret, error) { | ||||
| 	object, err := s.manager.GetObject(namespace, name) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| @@ -85,12 +85,12 @@ func (c *cachingSecretManager) GetSecret(namespace, name string) (*v1.Secret, er | ||||
| 	return nil, fmt.Errorf("unexpected object type: %v", object) | ||||
| } | ||||
|  | ||||
| func (c *cachingSecretManager) RegisterPod(pod *v1.Pod) { | ||||
| 	c.manager.RegisterPod(pod) | ||||
| func (s *secretManager) RegisterPod(pod *v1.Pod) { | ||||
| 	s.manager.RegisterPod(pod) | ||||
| } | ||||
|  | ||||
| func (c *cachingSecretManager) UnregisterPod(pod *v1.Pod) { | ||||
| 	c.manager.UnregisterPod(pod) | ||||
| func (s *secretManager) UnregisterPod(pod *v1.Pod) { | ||||
| 	s.manager.UnregisterPod(pod) | ||||
| } | ||||
|  | ||||
| func getSecretNames(pod *v1.Pod) sets.String { | ||||
| @@ -106,7 +106,7 @@ const ( | ||||
| 	defaultTTL = time.Minute | ||||
| ) | ||||
|  | ||||
| // NewCacheBasedManager creates a manager that keeps a cache of all secrets | ||||
| // NewCachingSecretManager creates a manager that keeps a cache of all secrets | ||||
| // necessary for registered pods. | ||||
| // It implements the following logic: | ||||
| // - whenever a pod is created or updated, the cached versions of all secrets | ||||
| @@ -119,7 +119,7 @@ func NewCachingSecretManager(kubeClient clientset.Interface, getTTL manager.GetO | ||||
| 		return kubeClient.CoreV1().Secrets(namespace).Get(name, opts) | ||||
| 	} | ||||
| 	secretStore := manager.NewObjectStore(getSecret, clock.RealClock{}, getTTL, defaultTTL) | ||||
| 	return &cachingSecretManager{ | ||||
| 	return &secretManager{ | ||||
| 		manager: manager.NewCacheBasedManager(secretStore, getSecretNames), | ||||
| 	} | ||||
| } | ||||
|   | ||||
| @@ -108,7 +108,7 @@ func podWithSecrets(ns, podName string, toAttach secretsToAttach) *v1.Pod { | ||||
| func TestCacheBasedSecretManager(t *testing.T) { | ||||
| 	fakeClient := &fake.Clientset{} | ||||
| 	store := manager.NewObjectStore(getSecret(fakeClient), clock.RealClock{}, noObjectTTL, 0) | ||||
| 	manager := &cachingSecretManager{ | ||||
| 	manager := &secretManager{ | ||||
| 		manager: manager.NewCacheBasedManager(store, getSecretNames), | ||||
| 	} | ||||
|  | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Kubernetes Submit Queue
					Kubernetes Submit Queue