apiserver: tri-state watch cache capacity: off, default, value
This commit is contained in:
		| @@ -11,10 +11,7 @@ go_library( | ||||
|     name = "go_default_library", | ||||
|     srcs = ["cachesize.go"], | ||||
|     tags = ["automanaged"], | ||||
|     deps = [ | ||||
|         "//vendor/github.com/golang/glog:go_default_library", | ||||
|         "//vendor/k8s.io/apiserver/pkg/registry/generic/registry:go_default_library", | ||||
|     ], | ||||
|     deps = ["//vendor/github.com/golang/glog:go_default_library"], | ||||
| ) | ||||
|  | ||||
| filegroup( | ||||
|   | ||||
| @@ -23,8 +23,6 @@ import ( | ||||
| 	"strings" | ||||
|  | ||||
| 	"github.com/golang/glog" | ||||
|  | ||||
| 	"k8s.io/apiserver/pkg/registry/generic/registry" | ||||
| ) | ||||
|  | ||||
| type Resource string | ||||
| @@ -111,11 +109,13 @@ func SetWatchCacheSizes(cacheSizes []string) { | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func GetWatchCacheSizeByResource(resource string) int { // TODO this should use schema.GroupResource for lookups | ||||
| // GetWatchCacheSizeByResource returns the configured watch cache size for the given resource. | ||||
| // A nil value means to use a default size, zero means to disable caching. | ||||
| func GetWatchCacheSizeByResource(resource string) (ret *int) { // TODO this should use schema.GroupResource for lookups | ||||
| 	if value, found := watchCacheSizes[Resource(resource)]; found { | ||||
| 		return value | ||||
| 		return &value | ||||
| 	} | ||||
| 	return registry.DefaultWatchCacheSize | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func maxInt(a, b int) int { | ||||
|   | ||||
| @@ -40,7 +40,7 @@ func NewREST(optsGetter generic.RESTOptionsGetter) *REST { | ||||
| 	} | ||||
|  | ||||
| 	// We explicitly do NOT do any decoration here yet. // TODO determine why we do not want to cache here | ||||
| 	opts.Decorator = generic.UndecoratedStorage // TODO use watchCacheSize=-1 to signal UndecoratedStorage | ||||
| 	opts.Decorator = generic.UndecoratedStorage | ||||
|  | ||||
| 	store := &genericregistry.Store{ | ||||
| 		Copier:            api.Scheme, | ||||
|   | ||||
| @@ -25,41 +25,49 @@ import ( | ||||
| 	"k8s.io/apiserver/pkg/storage/storagebackend/factory" | ||||
| ) | ||||
|  | ||||
| var _ generic.StorageDecorator = StorageWithCacher | ||||
|  | ||||
| // Creates a cacher based given storageConfig. | ||||
| func StorageWithCacher( | ||||
| 	copier runtime.ObjectCopier, | ||||
| 	storageConfig *storagebackend.Config, | ||||
| 	capacity int, | ||||
| 	objectType runtime.Object, | ||||
| 	resourcePrefix string, | ||||
| 	keyFunc func(obj runtime.Object) (string, error), | ||||
| 	newListFunc func() runtime.Object, | ||||
| 	getAttrsFunc storage.AttrFunc, | ||||
| 	triggerFunc storage.TriggerPublisherFunc) (storage.Interface, factory.DestroyFunc) { | ||||
| func StorageWithCacher(defaultCapacity int) generic.StorageDecorator { | ||||
| 	return func( | ||||
| 		copier runtime.ObjectCopier, | ||||
| 		storageConfig *storagebackend.Config, | ||||
| 		requestedSize *int, | ||||
| 		objectType runtime.Object, | ||||
| 		resourcePrefix string, | ||||
| 		keyFunc func(obj runtime.Object) (string, error), | ||||
| 		newListFunc func() runtime.Object, | ||||
| 		getAttrsFunc storage.AttrFunc, | ||||
| 		triggerFunc storage.TriggerPublisherFunc) (storage.Interface, factory.DestroyFunc) { | ||||
|  | ||||
| 	s, d := generic.NewRawStorage(storageConfig) | ||||
| 	// TODO: we would change this later to make storage always have cacher and hide low level KV layer inside. | ||||
| 	// Currently it has two layers of same storage interface -- cacher and low level kv. | ||||
| 	cacherConfig := storage.CacherConfig{ | ||||
| 		CacheCapacity:        capacity, | ||||
| 		Storage:              s, | ||||
| 		Versioner:            etcdstorage.APIObjectVersioner{}, | ||||
| 		Copier:               copier, | ||||
| 		Type:                 objectType, | ||||
| 		ResourcePrefix:       resourcePrefix, | ||||
| 		KeyFunc:              keyFunc, | ||||
| 		NewListFunc:          newListFunc, | ||||
| 		GetAttrsFunc:         getAttrsFunc, | ||||
| 		TriggerPublisherFunc: triggerFunc, | ||||
| 		Codec:                storageConfig.Codec, | ||||
| 	} | ||||
| 	cacher := storage.NewCacherFromConfig(cacherConfig) | ||||
| 	destroyFunc := func() { | ||||
| 		cacher.Stop() | ||||
| 		d() | ||||
| 	} | ||||
| 		capacity := defaultCapacity | ||||
| 		if requestedSize != nil && *requestedSize == 0 { | ||||
| 			panic("StorageWithCacher must not be called with zero cache size") | ||||
| 		} | ||||
| 		if requestedSize != nil { | ||||
| 			capacity = *requestedSize | ||||
| 		} | ||||
|  | ||||
| 	return cacher, destroyFunc | ||||
| 		s, d := generic.NewRawStorage(storageConfig) | ||||
| 		// TODO: we would change this later to make storage always have cacher and hide low level KV layer inside. | ||||
| 		// Currently it has two layers of same storage interface -- cacher and low level kv. | ||||
| 		cacherConfig := storage.CacherConfig{ | ||||
| 			CacheCapacity:        capacity, | ||||
| 			Storage:              s, | ||||
| 			Versioner:            etcdstorage.APIObjectVersioner{}, | ||||
| 			Copier:               copier, | ||||
| 			Type:                 objectType, | ||||
| 			ResourcePrefix:       resourcePrefix, | ||||
| 			KeyFunc:              keyFunc, | ||||
| 			NewListFunc:          newListFunc, | ||||
| 			GetAttrsFunc:         getAttrsFunc, | ||||
| 			TriggerPublisherFunc: triggerFunc, | ||||
| 			Codec:                storageConfig.Codec, | ||||
| 		} | ||||
| 		cacher := storage.NewCacherFromConfig(cacherConfig) | ||||
| 		destroyFunc := func() { | ||||
| 			cacher.Stop() | ||||
| 			d() | ||||
| 		} | ||||
|  | ||||
| 		return cacher, destroyFunc | ||||
| 	} | ||||
| } | ||||
|   | ||||
| @@ -44,9 +44,6 @@ import ( | ||||
| 	"github.com/golang/glog" | ||||
| ) | ||||
|  | ||||
| // defaultWatchCacheSize is the default size of a watch catch per resource in number of entries. | ||||
| const DefaultWatchCacheSize = 100 | ||||
|  | ||||
| // ObjectFunc is a function to act on a given object. An error may be returned | ||||
| // if the hook cannot be completed. An ObjectFunc may transform the provided | ||||
| // object. | ||||
| @@ -164,9 +161,9 @@ type Store struct { | ||||
| 	// Called to cleanup clients used by the underlying Storage; optional. | ||||
| 	DestroyFunc func() | ||||
| 	// Maximum size of the watch history cached in memory, in number of entries. | ||||
| 	// A zero value here means that a default is used. This value is ignored if | ||||
| 	// Storage is non-nil. | ||||
| 	WatchCacheSize int | ||||
| 	// This value is ignored if Storage is non-nil. Nil is replaced with a default value. | ||||
| 	// A zero integer will disable caching. | ||||
| 	WatchCacheSize *int | ||||
| } | ||||
|  | ||||
| // Note: the rest.StandardStorage interface aggregates the common REST verbs | ||||
| @@ -1205,14 +1202,10 @@ func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error { | ||||
| 	} | ||||
|  | ||||
| 	if e.Storage == nil { | ||||
| 		capacity := DefaultWatchCacheSize | ||||
| 		if e.WatchCacheSize != 0 { | ||||
| 			capacity = e.WatchCacheSize | ||||
| 		} | ||||
| 		e.Storage, e.DestroyFunc = opts.Decorator( | ||||
| 			e.Copier, | ||||
| 			opts.StorageConfig, | ||||
| 			capacity, | ||||
| 			e.WatchCacheSize, | ||||
| 			e.NewFunc(), | ||||
| 			prefix, | ||||
| 			keyFunc, | ||||
|   | ||||
| @@ -26,10 +26,11 @@ import ( | ||||
|  | ||||
| // StorageDecorator is a function signature for producing a storage.Interface | ||||
| // and an associated DestroyFunc from given parameters. | ||||
| // A zero capacity means to disable caching, nil means to use a default. | ||||
| type StorageDecorator func( | ||||
| 	copier runtime.ObjectCopier, | ||||
| 	config *storagebackend.Config, | ||||
| 	capacity int, | ||||
| 	capacity *int, | ||||
| 	objectType runtime.Object, | ||||
| 	resourcePrefix string, | ||||
| 	keyFunc func(obj runtime.Object) (string, error), | ||||
| @@ -42,7 +43,7 @@ type StorageDecorator func( | ||||
| func UndecoratedStorage( | ||||
| 	copier runtime.ObjectCopier, | ||||
| 	config *storagebackend.Config, | ||||
| 	capacity int, | ||||
| 	capacity *int, | ||||
| 	objectType runtime.Object, | ||||
| 	resourcePrefix string, | ||||
| 	keyFunc func(obj runtime.Object) (string, error), | ||||
|   | ||||
| @@ -39,7 +39,11 @@ type EtcdOptions struct { | ||||
| 	DefaultStorageMediaType string | ||||
| 	DeleteCollectionWorkers int | ||||
| 	EnableGarbageCollection bool | ||||
| 	EnableWatchCache        bool | ||||
|  | ||||
| 	// Set EnableWatchCache to false to disable all watch caches | ||||
| 	EnableWatchCache bool | ||||
| 	// Set DefaultWatchCacheSize to zero to disable watch caches for those resources that have no explicit cache size set | ||||
| 	DefaultWatchCacheSize int | ||||
| } | ||||
|  | ||||
| func NewEtcdOptions(backendConfig *storagebackend.Config) *EtcdOptions { | ||||
| @@ -49,6 +53,7 @@ func NewEtcdOptions(backendConfig *storagebackend.Config) *EtcdOptions { | ||||
| 		DeleteCollectionWorkers: 1, | ||||
| 		EnableGarbageCollection: true, | ||||
| 		EnableWatchCache:        true, | ||||
| 		DefaultWatchCacheSize:   100, | ||||
| 	} | ||||
| } | ||||
|  | ||||
| @@ -129,7 +134,7 @@ func (f *SimpleRestOptionsFactory) GetRESTOptions(resource schema.GroupResource) | ||||
| 		ResourcePrefix:          f.Options.StorageConfig.Prefix + "/" + resource.Group + "/" + resource.Resource, | ||||
| 	} | ||||
| 	if f.Options.EnableWatchCache { | ||||
| 		ret.Decorator = genericregistry.StorageWithCacher | ||||
| 		ret.Decorator = genericregistry.StorageWithCacher(f.Options.DefaultWatchCacheSize) | ||||
| 	} | ||||
| 	return ret, nil | ||||
| } | ||||
| @@ -153,7 +158,7 @@ func (f *storageFactoryRestOptionsFactory) GetRESTOptions(resource schema.GroupR | ||||
| 		ResourcePrefix:          f.StorageFactory.ResourcePrefix(resource), | ||||
| 	} | ||||
| 	if f.Options.EnableWatchCache { | ||||
| 		ret.Decorator = genericregistry.StorageWithCacher | ||||
| 		ret.Decorator = genericregistry.StorageWithCacher(f.Options.DefaultWatchCacheSize) | ||||
| 	} | ||||
|  | ||||
| 	return ret, nil | ||||
|   | ||||
| @@ -38,7 +38,6 @@ func NewREST(scheme *runtime.Scheme, optsGetter generic.RESTOptionsGetter) *REST | ||||
| 		NewListFunc:       func() runtime.Object { return &apiregistration.APIServiceList{} }, | ||||
| 		PredicateFunc:     apiservice.MatchAPIService, | ||||
| 		QualifiedResource: apiregistration.Resource("apiservices"), | ||||
| 		WatchCacheSize:    100, | ||||
|  | ||||
| 		CreateStrategy: strategy, | ||||
| 		UpdateStrategy: strategy, | ||||
|   | ||||
| @@ -353,6 +353,7 @@ type CustomResourceRESTOptionsGetter struct { | ||||
| 	StorageConfig           storagebackend.Config | ||||
| 	StoragePrefix           string | ||||
| 	EnableWatchCache        bool | ||||
| 	DefaultWatchCacheSize   int | ||||
| 	EnableGarbageCollection bool | ||||
| 	DeleteCollectionWorkers int | ||||
| } | ||||
| @@ -366,7 +367,7 @@ func (t CustomResourceRESTOptionsGetter) GetRESTOptions(resource schema.GroupRes | ||||
| 		ResourcePrefix:          t.StoragePrefix + "/" + resource.Group + "/" + resource.Resource, | ||||
| 	} | ||||
| 	if t.EnableWatchCache { | ||||
| 		ret.Decorator = genericregistry.StorageWithCacher | ||||
| 		ret.Decorator = genericregistry.StorageWithCacher(t.DefaultWatchCacheSize) | ||||
| 	} | ||||
| 	return ret, nil | ||||
| } | ||||
|   | ||||
| @@ -99,6 +99,7 @@ func (o CustomResourcesServerOptions) Config() (*apiserver.Config, error) { | ||||
| 		StorageConfig:           o.RecommendedOptions.Etcd.StorageConfig, | ||||
| 		StoragePrefix:           o.RecommendedOptions.Etcd.StorageConfig.Prefix, | ||||
| 		EnableWatchCache:        o.RecommendedOptions.Etcd.EnableWatchCache, | ||||
| 		DefaultWatchCacheSize:   o.RecommendedOptions.Etcd.DefaultWatchCacheSize, | ||||
| 		EnableGarbageCollection: o.RecommendedOptions.Etcd.EnableGarbageCollection, | ||||
| 		DeleteCollectionWorkers: o.RecommendedOptions.Etcd.DeleteCollectionWorkers, | ||||
| 	} | ||||
|   | ||||
| @@ -80,6 +80,7 @@ func DefaultServerConfig() (*extensionsapiserver.Config, error) { | ||||
| 		StorageConfig:           options.RecommendedOptions.Etcd.StorageConfig, | ||||
| 		StoragePrefix:           options.RecommendedOptions.Etcd.StorageConfig.Prefix, | ||||
| 		EnableWatchCache:        options.RecommendedOptions.Etcd.EnableWatchCache, | ||||
| 		DefaultWatchCacheSize:   options.RecommendedOptions.Etcd.DefaultWatchCacheSize, | ||||
| 		EnableGarbageCollection: options.RecommendedOptions.Etcd.EnableGarbageCollection, | ||||
| 		DeleteCollectionWorkers: options.RecommendedOptions.Etcd.DeleteCollectionWorkers, | ||||
| 	} | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Dr. Stefan Schimanski
					Dr. Stefan Schimanski