
- Run hack/update-codegen.sh - Run hack/update-generated-device-plugin.sh - Run hack/update-generated-protobuf.sh - Run hack/update-generated-runtime.sh - Run hack/update-generated-swagger-docs.sh - Run hack/update-openapi-spec.sh - Run hack/update-gofmt.sh Signed-off-by: Davanum Srinivas <davanum@gmail.com>
394 lines
11 KiB
Go
394 lines
11 KiB
Go
/*
|
|
Copyright 2018 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package manager
|
|
|
|
import (
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"k8s.io/api/core/v1"
|
|
"k8s.io/client-go/tools/cache"
|
|
|
|
"k8s.io/klog/v2"
|
|
|
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/fields"
|
|
"k8s.io/apimachinery/pkg/runtime"
|
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
|
"k8s.io/apimachinery/pkg/util/sets"
|
|
"k8s.io/apimachinery/pkg/util/wait"
|
|
"k8s.io/apimachinery/pkg/watch"
|
|
"k8s.io/utils/clock"
|
|
)
|
|
|
|
type listObjectFunc func(string, metav1.ListOptions) (runtime.Object, error)
|
|
type watchObjectFunc func(string, metav1.ListOptions) (watch.Interface, error)
|
|
type newObjectFunc func() runtime.Object
|
|
type isImmutableFunc func(runtime.Object) bool
|
|
|
|
// objectCacheItem is a single item stored in objectCache.
|
|
type objectCacheItem struct {
|
|
refCount int
|
|
store *cacheStore
|
|
reflector *cache.Reflector
|
|
|
|
hasSynced func() (bool, error)
|
|
|
|
// waitGroup is used to ensure that there won't be two concurrent calls to reflector.Run
|
|
waitGroup sync.WaitGroup
|
|
|
|
// lock is to ensure the access and modify of lastAccessTime, stopped, and immutable are thread safety,
|
|
// and protecting from closing stopCh multiple times.
|
|
lock sync.Mutex
|
|
lastAccessTime time.Time
|
|
stopped bool
|
|
immutable bool
|
|
stopCh chan struct{}
|
|
}
|
|
|
|
func (i *objectCacheItem) stop() bool {
|
|
i.lock.Lock()
|
|
defer i.lock.Unlock()
|
|
return i.stopThreadUnsafe()
|
|
}
|
|
|
|
func (i *objectCacheItem) stopThreadUnsafe() bool {
|
|
if i.stopped {
|
|
return false
|
|
}
|
|
i.stopped = true
|
|
close(i.stopCh)
|
|
if !i.immutable {
|
|
i.store.unsetInitialized()
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (i *objectCacheItem) setLastAccessTime(time time.Time) {
|
|
i.lock.Lock()
|
|
defer i.lock.Unlock()
|
|
i.lastAccessTime = time
|
|
}
|
|
|
|
func (i *objectCacheItem) setImmutable() {
|
|
i.lock.Lock()
|
|
defer i.lock.Unlock()
|
|
i.immutable = true
|
|
}
|
|
|
|
func (i *objectCacheItem) stopIfIdle(now time.Time, maxIdleTime time.Duration) bool {
|
|
i.lock.Lock()
|
|
defer i.lock.Unlock()
|
|
// Ensure that we don't try to stop not yet initialized reflector.
|
|
// In case of overloaded kube-apiserver, if the list request is
|
|
// already being processed, all the work would lost and would have
|
|
// to be retried.
|
|
if !i.stopped && i.store.hasSynced() && now.After(i.lastAccessTime.Add(maxIdleTime)) {
|
|
return i.stopThreadUnsafe()
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (i *objectCacheItem) restartReflectorIfNeeded() {
|
|
i.lock.Lock()
|
|
defer i.lock.Unlock()
|
|
if i.immutable || !i.stopped {
|
|
return
|
|
}
|
|
i.stopCh = make(chan struct{})
|
|
i.stopped = false
|
|
go i.startReflector()
|
|
}
|
|
|
|
func (i *objectCacheItem) startReflector() {
|
|
i.waitGroup.Wait()
|
|
i.waitGroup.Add(1)
|
|
defer i.waitGroup.Done()
|
|
i.reflector.Run(i.stopCh)
|
|
}
|
|
|
|
// cacheStore is in order to rewrite Replace function to mark initialized flag
|
|
type cacheStore struct {
|
|
cache.Store
|
|
lock sync.Mutex
|
|
initialized bool
|
|
}
|
|
|
|
func (c *cacheStore) Replace(list []interface{}, resourceVersion string) error {
|
|
c.lock.Lock()
|
|
defer c.lock.Unlock()
|
|
err := c.Store.Replace(list, resourceVersion)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
c.initialized = true
|
|
return nil
|
|
}
|
|
|
|
func (c *cacheStore) hasSynced() bool {
|
|
c.lock.Lock()
|
|
defer c.lock.Unlock()
|
|
return c.initialized
|
|
}
|
|
|
|
func (c *cacheStore) unsetInitialized() {
|
|
c.lock.Lock()
|
|
defer c.lock.Unlock()
|
|
c.initialized = false
|
|
}
|
|
|
|
// objectCache is a local cache of objects propagated via
|
|
// individual watches.
|
|
type objectCache struct {
|
|
listObject listObjectFunc
|
|
watchObject watchObjectFunc
|
|
newObject newObjectFunc
|
|
isImmutable isImmutableFunc
|
|
groupResource schema.GroupResource
|
|
clock clock.Clock
|
|
maxIdleTime time.Duration
|
|
|
|
lock sync.RWMutex
|
|
items map[objectKey]*objectCacheItem
|
|
stopped bool
|
|
}
|
|
|
|
const minIdleTime = 1 * time.Minute
|
|
|
|
// NewObjectCache returns a new watch-based instance of Store interface.
|
|
func NewObjectCache(
|
|
listObject listObjectFunc,
|
|
watchObject watchObjectFunc,
|
|
newObject newObjectFunc,
|
|
isImmutable isImmutableFunc,
|
|
groupResource schema.GroupResource,
|
|
clock clock.Clock,
|
|
maxIdleTime time.Duration,
|
|
stopCh <-chan struct{}) Store {
|
|
|
|
if maxIdleTime < minIdleTime {
|
|
maxIdleTime = minIdleTime
|
|
}
|
|
|
|
store := &objectCache{
|
|
listObject: listObject,
|
|
watchObject: watchObject,
|
|
newObject: newObject,
|
|
isImmutable: isImmutable,
|
|
groupResource: groupResource,
|
|
clock: clock,
|
|
maxIdleTime: maxIdleTime,
|
|
items: make(map[objectKey]*objectCacheItem),
|
|
}
|
|
|
|
go wait.Until(store.startRecycleIdleWatch, time.Minute, stopCh)
|
|
go store.shutdownWhenStopped(stopCh)
|
|
return store
|
|
}
|
|
|
|
func (c *objectCache) newStore() *cacheStore {
|
|
// TODO: We may consider created a dedicated store keeping just a single
|
|
// item, instead of using a generic store implementation for this purpose.
|
|
// However, simple benchmarks show that memory overhead in that case is
|
|
// decrease from ~600B to ~300B per object. So we are not optimizing it
|
|
// until we will see a good reason for that.
|
|
store := cache.NewStore(cache.MetaNamespaceKeyFunc)
|
|
return &cacheStore{store, sync.Mutex{}, false}
|
|
}
|
|
|
|
func (c *objectCache) newReflectorLocked(namespace, name string) *objectCacheItem {
|
|
fieldSelector := fields.Set{"metadata.name": name}.AsSelector().String()
|
|
listFunc := func(options metav1.ListOptions) (runtime.Object, error) {
|
|
options.FieldSelector = fieldSelector
|
|
return c.listObject(namespace, options)
|
|
}
|
|
watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
|
|
options.FieldSelector = fieldSelector
|
|
return c.watchObject(namespace, options)
|
|
}
|
|
store := c.newStore()
|
|
reflector := cache.NewNamedReflector(
|
|
fmt.Sprintf("object-%q/%q", namespace, name),
|
|
&cache.ListWatch{ListFunc: listFunc, WatchFunc: watchFunc},
|
|
c.newObject(),
|
|
store,
|
|
0,
|
|
)
|
|
item := &objectCacheItem{
|
|
refCount: 0,
|
|
store: store,
|
|
reflector: reflector,
|
|
hasSynced: func() (bool, error) { return store.hasSynced(), nil },
|
|
stopCh: make(chan struct{}),
|
|
}
|
|
|
|
// Don't start reflector if Kubelet is already shutting down.
|
|
if !c.stopped {
|
|
go item.startReflector()
|
|
}
|
|
return item
|
|
}
|
|
|
|
func (c *objectCache) AddReference(namespace, name string) {
|
|
key := objectKey{namespace: namespace, name: name}
|
|
|
|
// AddReference is called from RegisterPod thus it needs to be efficient.
|
|
// Thus, it is only increasing refCount and in case of first registration
|
|
// of a given object it starts corresponding reflector.
|
|
// It's responsibility of the first Get operation to wait until the
|
|
// reflector propagated the store.
|
|
c.lock.Lock()
|
|
defer c.lock.Unlock()
|
|
item, exists := c.items[key]
|
|
if !exists {
|
|
item = c.newReflectorLocked(namespace, name)
|
|
c.items[key] = item
|
|
}
|
|
item.refCount++
|
|
}
|
|
|
|
func (c *objectCache) DeleteReference(namespace, name string) {
|
|
key := objectKey{namespace: namespace, name: name}
|
|
|
|
c.lock.Lock()
|
|
defer c.lock.Unlock()
|
|
if item, ok := c.items[key]; ok {
|
|
item.refCount--
|
|
if item.refCount == 0 {
|
|
// Stop the underlying reflector.
|
|
item.stop()
|
|
delete(c.items, key)
|
|
}
|
|
}
|
|
}
|
|
|
|
// key returns key of an object with a given name and namespace.
|
|
// This has to be in-sync with cache.MetaNamespaceKeyFunc.
|
|
func (c *objectCache) key(namespace, name string) string {
|
|
if len(namespace) > 0 {
|
|
return namespace + "/" + name
|
|
}
|
|
return name
|
|
}
|
|
|
|
func (c *objectCache) isStopped() bool {
|
|
c.lock.RLock()
|
|
defer c.lock.RUnlock()
|
|
return c.stopped
|
|
}
|
|
|
|
func (c *objectCache) Get(namespace, name string) (runtime.Object, error) {
|
|
key := objectKey{namespace: namespace, name: name}
|
|
|
|
c.lock.RLock()
|
|
item, exists := c.items[key]
|
|
c.lock.RUnlock()
|
|
|
|
if !exists {
|
|
return nil, fmt.Errorf("object %q/%q not registered", namespace, name)
|
|
}
|
|
// Record last access time independently if it succeeded or not.
|
|
// This protects from premature (racy) reflector closure.
|
|
item.setLastAccessTime(c.clock.Now())
|
|
|
|
// Don't restart reflector if Kubelet is already shutting down.
|
|
if !c.isStopped() {
|
|
item.restartReflectorIfNeeded()
|
|
}
|
|
if err := wait.PollImmediate(10*time.Millisecond, time.Second, item.hasSynced); err != nil {
|
|
return nil, fmt.Errorf("failed to sync %s cache: %v", c.groupResource.String(), err)
|
|
}
|
|
obj, exists, err := item.store.GetByKey(c.key(namespace, name))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if !exists {
|
|
return nil, apierrors.NewNotFound(c.groupResource, name)
|
|
}
|
|
if object, ok := obj.(runtime.Object); ok {
|
|
// If the returned object is immutable, stop the reflector.
|
|
//
|
|
// NOTE: we may potentially not even start the reflector if the object is
|
|
// already immutable. However, given that:
|
|
// - we want to also handle the case when object is marked as immutable later
|
|
// - Secrets and ConfigMaps are periodically fetched by volumemanager anyway
|
|
// - doing that wouldn't provide visible scalability/performance gain - we
|
|
// already have it from here
|
|
// - doing that would require significant refactoring to reflector
|
|
// we limit ourselves to just quickly stop the reflector here.
|
|
if c.isImmutable(object) {
|
|
item.setImmutable()
|
|
if item.stop() {
|
|
klog.V(4).InfoS("Stopped watching for changes - object is immutable", "obj", klog.KRef(namespace, name))
|
|
}
|
|
}
|
|
return object, nil
|
|
}
|
|
return nil, fmt.Errorf("unexpected object type: %v", obj)
|
|
}
|
|
|
|
func (c *objectCache) startRecycleIdleWatch() {
|
|
c.lock.Lock()
|
|
defer c.lock.Unlock()
|
|
|
|
for key, item := range c.items {
|
|
if item.stopIfIdle(c.clock.Now(), c.maxIdleTime) {
|
|
klog.V(4).InfoS("Not acquired for long time, Stopped watching for changes", "objectKey", key, "maxIdleTime", c.maxIdleTime)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *objectCache) shutdownWhenStopped(stopCh <-chan struct{}) {
|
|
<-stopCh
|
|
|
|
c.lock.Lock()
|
|
defer c.lock.Unlock()
|
|
|
|
c.stopped = true
|
|
for _, item := range c.items {
|
|
item.stop()
|
|
}
|
|
}
|
|
|
|
// NewWatchBasedManager creates a manager that keeps a cache of all objects
|
|
// necessary for registered pods.
|
|
// It implements the following logic:
|
|
// - whenever a pod is created or updated, we start individual watches for all
|
|
// referenced objects that aren't referenced from other registered pods
|
|
// - every GetObject() returns a value from local cache propagated via watches
|
|
func NewWatchBasedManager(
|
|
listObject listObjectFunc,
|
|
watchObject watchObjectFunc,
|
|
newObject newObjectFunc,
|
|
isImmutable isImmutableFunc,
|
|
groupResource schema.GroupResource,
|
|
resyncInterval time.Duration,
|
|
getReferencedObjects func(*v1.Pod) sets.String) Manager {
|
|
|
|
// If a configmap/secret is used as a volume, the volumeManager will visit the objectCacheItem every resyncInterval cycle,
|
|
// We just want to stop the objectCacheItem referenced by environment variables,
|
|
// So, maxIdleTime is set to an integer multiple of resyncInterval,
|
|
// We currently set it to 5 times.
|
|
maxIdleTime := resyncInterval * 5
|
|
|
|
// TODO propagate stopCh from the higher level.
|
|
objectStore := NewObjectCache(listObject, watchObject, newObject, isImmutable, groupResource, clock.RealClock{}, maxIdleTime, wait.NeverStop)
|
|
return NewCacheBasedManager(objectStore, getReferencedObjects)
|
|
}
|