Enable garbage collection of custom resources

Enhance the garbage collector to periodically refresh the resources it
monitors (via discovery) to enable custom resource definition GC.

This implementation caches Unstructured structs for any kinds not
covered by a shared informer. The existing meta-only codec only supports
compiled types; an improved codec which supports arbitrary types could
be introduced to optimize caching to store only metadata for all
non-informer types.
This commit is contained in:
Dan Mace
2017-05-17 15:54:58 -07:00
parent 3d3d3922c2
commit d08dfb92c7
16 changed files with 856 additions and 235 deletions

View File

@@ -42,6 +42,7 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
"//vendor/k8s.io/client-go/discovery:go_default_library",
"//vendor/k8s.io/client-go/dynamic:go_default_library",
"//vendor/k8s.io/client-go/informers:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",

View File

@@ -20,8 +20,6 @@ import (
"fmt"
)
const nonCoreMessage = `If %s is a non-core resource (e.g. thirdparty resource, custom resource from aggregated apiserver), please note that the garbage collector doesn't support non-core resources yet. Once they are supported, object with ownerReferences referring non-existing non-core objects will be deleted by the garbage collector.`
type restMappingError struct {
kind string
version string
@@ -36,7 +34,6 @@ func (r *restMappingError) Error() string {
func (r *restMappingError) Message() string {
versionKind := fmt.Sprintf("%s/%s", r.version, r.kind)
errMsg := fmt.Sprintf("unable to get REST mapping for %s. ", versionKind)
errMsg += fmt.Sprintf(nonCoreMessage, versionKind)
errMsg += fmt.Sprintf(" If %s is an invalid resource, then you should manually remove ownerReferences that refer %s objects.", versionKind, versionKind)
return errMsg
}

View File

@@ -32,6 +32,7 @@ import (
utilerrors "k8s.io/apimachinery/pkg/util/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/informers"
"k8s.io/client-go/util/workqueue"
@@ -73,6 +74,8 @@ type GarbageCollector struct {
// GC caches the owners that do not exist according to the API server.
absentOwnerCache *UIDCache
sharedInformers informers.SharedInformerFactory
workerLock sync.RWMutex
}
func NewGarbageCollector(
@@ -108,14 +111,24 @@ func NewGarbageCollector(
sharedInformers: sharedInformers,
ignoredResources: ignoredResources,
}
if err := gb.monitorsForResources(deletableResources); err != nil {
return nil, err
if err := gb.syncMonitors(deletableResources); err != nil {
utilruntime.HandleError(fmt.Errorf("failed to sync all monitors: %v", err))
}
gc.dependencyGraphBuilder = gb
return gc, nil
}
// resyncMonitors starts or stops resource monitors as needed to ensure that all
// (and only) those resources present in the map are monitored.
func (gc *GarbageCollector) resyncMonitors(deletableResources map[schema.GroupVersionResource]struct{}) error {
if err := gc.dependencyGraphBuilder.syncMonitors(deletableResources); err != nil {
return err
}
gc.dependencyGraphBuilder.startMonitors()
return nil
}
func (gc *GarbageCollector) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer gc.attemptToDelete.ShutDown()
@@ -125,9 +138,9 @@ func (gc *GarbageCollector) Run(workers int, stopCh <-chan struct{}) {
glog.Infof("Starting garbage collector controller")
defer glog.Infof("Shutting down garbage collector controller")
gc.dependencyGraphBuilder.Run(stopCh)
go gc.dependencyGraphBuilder.Run(stopCh)
if !controller.WaitForCacheSync("garbage collector", stopCh, gc.dependencyGraphBuilder.HasSynced) {
if !controller.WaitForCacheSync("garbage collector", stopCh, gc.dependencyGraphBuilder.IsSynced) {
return
}
@@ -142,8 +155,43 @@ func (gc *GarbageCollector) Run(workers int, stopCh <-chan struct{}) {
<-stopCh
}
func (gc *GarbageCollector) HasSynced() bool {
return gc.dependencyGraphBuilder.HasSynced()
// resettableRESTMapper is a RESTMapper which is capable of resetting itself
// from discovery.
type resettableRESTMapper interface {
meta.RESTMapper
Reset()
}
// Sync periodically resyncs the garbage collector monitors with resources
// returned found via the discoveryClient. Sync blocks, continuing to sync until
// a message is received on stopCh.
//
// The discoveryClient should be the same client which underlies restMapper.
func (gc *GarbageCollector) Sync(restMapper resettableRESTMapper, discoveryClient discovery.DiscoveryInterface, period time.Duration, stopCh <-chan struct{}) {
wait.Until(func() {
// Ensure workers are paused to avoid processing events before informers
// have resynced.
gc.workerLock.Lock()
defer gc.workerLock.Unlock()
restMapper.Reset()
deletableResources, err := GetDeletableResources(discoveryClient)
if err != nil {
utilruntime.HandleError(err)
return
}
if err := gc.resyncMonitors(deletableResources); err != nil {
utilruntime.HandleError(fmt.Errorf("failed to sync resource monitors: %v", err))
return
}
if !controller.WaitForCacheSync("garbage collector", stopCh, gc.dependencyGraphBuilder.IsSynced) {
utilruntime.HandleError(fmt.Errorf("timed out waiting for dependency graph builder sync during GC sync"))
}
}, period, stopCh)
}
func (gc *GarbageCollector) IsSynced() bool {
return gc.dependencyGraphBuilder.IsSynced()
}
func (gc *GarbageCollector) runAttemptToDeleteWorker() {
@@ -153,6 +201,8 @@ func (gc *GarbageCollector) runAttemptToDeleteWorker() {
func (gc *GarbageCollector) attemptToDeleteWorker() bool {
item, quit := gc.attemptToDelete.Get()
gc.workerLock.RLock()
defer gc.workerLock.RUnlock()
if quit {
return false
}
@@ -164,13 +214,18 @@ func (gc *GarbageCollector) attemptToDeleteWorker() bool {
}
err := gc.attemptToDeleteItem(n)
if err != nil {
// TODO: remove this block when gc starts using dynamic RESTMapper.
if restMappingError, ok := err.(*restMappingError); ok {
utilruntime.HandleError(fmt.Errorf("Ignore syncing item %#v: %s", n, restMappingError.Message()))
// The RESTMapper is static, so no need to retry, otherwise we'll get the same error.
return true
if _, ok := err.(*restMappingError); ok {
// There are at least two ways this can happen:
// 1. The reference is to an object of a custom type that has not yet been
// recognized by gc.restMapper (this is a transient error).
// 2. The reference is to an invalid group/version. We don't currently
// have a way to distinguish this from a valid type we will recognize
// after the next discovery sync.
// For now, record the error and retry.
glog.V(5).Infof("error syncing item %s: %v", n, err)
} else {
utilruntime.HandleError(fmt.Errorf("error syncing item %s: %v", n, err))
}
utilruntime.HandleError(fmt.Errorf("Error syncing item %#v: %v", n, err))
// retry if garbage collection of an object failed.
gc.attemptToDelete.AddRateLimited(item)
}
@@ -454,6 +509,8 @@ func (gc *GarbageCollector) runAttemptToOrphanWorker() {
// these steps fail.
func (gc *GarbageCollector) attemptToOrphanWorker() bool {
item, quit := gc.attemptToOrphan.Get()
gc.workerLock.RLock()
defer gc.workerLock.RUnlock()
if quit {
return false
}
@@ -498,3 +555,19 @@ func (gc *GarbageCollector) GraphHasUID(UIDs []types.UID) bool {
}
return false
}
// GetDeletableResources returns all resources from discoveryClient that the
// garbage collector should recognize and work with. More specifically, all
// preferred resources which support the 'delete' verb.
func GetDeletableResources(discoveryClient discovery.DiscoveryInterface) (map[schema.GroupVersionResource]struct{}, error) {
preferredResources, err := discoveryClient.ServerPreferredResources()
if err != nil {
return nil, fmt.Errorf("failed to get supported resources from server: %v", err)
}
deletableResources := discovery.FilteredBy(discovery.SupportsAllVerbs{Verbs: []string{"delete"}}, preferredResources)
deletableGroupVersionResources, err := discovery.GroupVersionResources(deletableResources)
if err != nil {
return nil, fmt.Errorf("Failed to parse resources from server: %v", err)
}
return deletableGroupVersionResources, nil
}

View File

@@ -46,25 +46,62 @@ import (
"k8s.io/kubernetes/pkg/controller/garbagecollector/metaonly"
)
func TestNewGarbageCollector(t *testing.T) {
func TestGarbageCollectorConstruction(t *testing.T) {
config := &restclient.Config{}
config.ContentConfig.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: metaonly.NewMetadataCodecFactory()}
metaOnlyClientPool := dynamic.NewClientPool(config, api.Registry.RESTMapper(), dynamic.LegacyAPIPathResolverFunc)
tweakableRM := meta.NewDefaultRESTMapper(nil, nil)
rm := meta.MultiRESTMapper{tweakableRM, api.Registry.RESTMapper()}
metaOnlyClientPool := dynamic.NewClientPool(config, rm, dynamic.LegacyAPIPathResolverFunc)
config.ContentConfig.NegotiatedSerializer = nil
clientPool := dynamic.NewClientPool(config, api.Registry.RESTMapper(), dynamic.LegacyAPIPathResolverFunc)
clientPool := dynamic.NewClientPool(config, rm, dynamic.LegacyAPIPathResolverFunc)
podResource := map[schema.GroupVersionResource]struct{}{
{Version: "v1", Resource: "pods"}: {},
// no monitor will be constructed for non-core resource, the GC construction will not fail.
}
twoResources := map[schema.GroupVersionResource]struct{}{
{Version: "v1", Resource: "pods"}: {},
{Group: "tpr.io", Version: "v1", Resource: "unknown"}: {},
}
client := fake.NewSimpleClientset()
sharedInformers := informers.NewSharedInformerFactory(client, 0)
gc, err := NewGarbageCollector(metaOnlyClientPool, clientPool, api.Registry.RESTMapper(), podResource, ignoredResources, sharedInformers)
// No monitor will be constructed for the non-core resource, but the GC
// construction will not fail.
gc, err := NewGarbageCollector(metaOnlyClientPool, clientPool, rm, twoResources, map[schema.GroupResource]struct{}{}, sharedInformers)
if err != nil {
t.Fatal(err)
}
assert.Equal(t, 1, len(gc.dependencyGraphBuilder.monitors))
// Make sure resource monitor syncing creates and stops resource monitors.
tweakableRM.Add(schema.GroupVersionKind{Group: "tpr.io", Version: "v1", Kind: "unknown"}, nil)
err = gc.resyncMonitors(twoResources)
if err != nil {
t.Errorf("Failed adding a monitor: %v", err)
}
assert.Equal(t, 2, len(gc.dependencyGraphBuilder.monitors))
err = gc.resyncMonitors(podResource)
if err != nil {
t.Errorf("Failed removing a monitor: %v", err)
}
assert.Equal(t, 1, len(gc.dependencyGraphBuilder.monitors))
// Make sure the syncing mechanism also works after Run() has been called
stopCh := make(chan struct{})
defer close(stopCh)
go gc.Run(1, stopCh)
err = gc.resyncMonitors(twoResources)
if err != nil {
t.Errorf("Failed adding a monitor: %v", err)
}
assert.Equal(t, 2, len(gc.dependencyGraphBuilder.monitors))
err = gc.resyncMonitors(podResource)
if err != nil {
t.Errorf("Failed removing a monitor: %v", err)
}
assert.Equal(t, 1, len(gc.dependencyGraphBuilder.monitors))
}
// fakeAction records information about requests to aid in testing.

View File

@@ -134,6 +134,14 @@ func (n *node) blockingDependents() []*node {
return ret
}
// String renders node as a string using fmt. Acquires a read lock to ensure the
// reflective dump of dependents doesn't race with any concurrent writes.
func (n *node) String() string {
n.dependentsLock.RLock()
defer n.dependentsLock.RUnlock()
return fmt.Sprintf("%#v", n)
}
type concurrentUIDToNode struct {
uidToNodeLock sync.RWMutex
uidToNode map[types.UID]*node

View File

@@ -19,6 +19,7 @@ package garbagecollector
import (
"fmt"
"reflect"
"sync"
"time"
"github.com/golang/glog"
@@ -27,6 +28,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
@@ -71,9 +73,18 @@ type event struct {
// items to the attemptToDelete and attemptToOrphan.
type GraphBuilder struct {
restMapper meta.RESTMapper
// each monitor list/watches a resource, the results are funneled to the
// dependencyGraphBuilder
monitors []cache.Controller
monitors monitors
monitorLock sync.Mutex
// stopCh drives shutdown. If it is nil, it indicates that Run() has not been
// called yet. If it is non-nil, then when closed it indicates everything
// should shut down.
//
// This channel is also protected by monitorLock.
stopCh <-chan struct{}
// metaOnlyClientPool uses a special codec, which removes fields except for
// apiVersion, kind, and metadata during decoding.
metaOnlyClientPool dynamic.ClientPool
@@ -93,10 +104,26 @@ type GraphBuilder struct {
// be non-existent are added to the cached.
absentOwnerCache *UIDCache
sharedInformers informers.SharedInformerFactory
stopCh <-chan struct{}
ignoredResources map[schema.GroupResource]struct{}
}
// monitor runs a Controller with a local stop channel.
type monitor struct {
controller cache.Controller
// stopCh stops Controller. If stopCh is nil, the monitor is considered to be
// not yet started.
stopCh chan struct{}
}
// Run is intended to be called in a goroutine. Multiple calls of this is an
// error.
func (m *monitor) Run() {
m.controller.Run(m.stopCh)
}
type monitors map[schema.GroupVersionResource]*monitor
func listWatcher(client dynamic.Interface, resource schema.GroupVersionResource) *cache.ListWatch {
return &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
@@ -157,19 +184,11 @@ func (gb *GraphBuilder) controllerFor(resource schema.GroupVersionResource, kind
gb.graphChanges.Add(event)
},
}
shared, err := gb.sharedInformers.ForResource(resource)
if err == nil {
glog.V(4).Infof("using a shared informer for resource %q, kind %q", resource.String(), kind.String())
// need to clone because it's from a shared cache
shared.Informer().AddEventHandlerWithResyncPeriod(handlers, ResourceResyncTime)
if gb.stopCh != nil {
// if gb.stopCh is set, it means we've already gotten past the initial gb.Run() call, so this
// means we've re-loaded and re-read discovery and we are adding a new monitor for a
// previously unseen resource, so we need to call Start on the shared informers again (this
// will only start those shared informers that have not yet been started).
go gb.sharedInformers.Start(gb.stopCh)
}
return shared.Informer().GetController(), nil
} else {
glog.V(4).Infof("unable to use a shared informer for resource %q, kind %q: %v", resource.String(), kind.String(), err)
@@ -181,6 +200,7 @@ func (gb *GraphBuilder) controllerFor(resource schema.GroupVersionResource, kind
if err != nil {
return nil, err
}
// TODO: since the gv is never unregistered, isn't this a memory leak?
gb.registeredRateLimiterForControllers.registerIfNotPresent(resource.GroupVersion(), client, "garbage_collector_monitoring")
_, monitor := cache.NewInformer(
listWatcher(client, resource),
@@ -192,44 +212,134 @@ func (gb *GraphBuilder) controllerFor(resource schema.GroupVersionResource, kind
return monitor, nil
}
func (gb *GraphBuilder) monitorsForResources(resources map[schema.GroupVersionResource]struct{}) error {
// syncMonitors rebuilds the monitor set according to the supplied resources,
// creating or deleting monitors as necessary. It will return any error
// encountered, but will make an attempt to create a monitor for each resource
// instead of immediately exiting on an error. It may be called before or after
// Run. Monitors are NOT started as part of the sync. To ensure all existing
// monitors are started, call startMonitors.
func (gb *GraphBuilder) syncMonitors(resources map[schema.GroupVersionResource]struct{}) error {
gb.monitorLock.Lock()
defer gb.monitorLock.Unlock()
toRemove := gb.monitors
if toRemove == nil {
toRemove = monitors{}
}
current := monitors{}
errs := []error{}
kept := 0
added := 0
for resource := range resources {
if _, ok := gb.ignoredResources[resource.GroupResource()]; ok {
glog.V(5).Infof("ignore resource %#v", resource)
if _, ok := ignoredResources[resource.GroupResource()]; ok {
continue
}
if m, ok := toRemove[resource]; ok {
current[resource] = m
delete(toRemove, resource)
kept++
continue
}
kind, err := gb.restMapper.KindFor(resource)
if err != nil {
nonCoreMsg := fmt.Sprintf(nonCoreMessage, resource)
utilruntime.HandleError(fmt.Errorf("%v. %s", err, nonCoreMsg))
errs = append(errs, fmt.Errorf("couldn't look up resource %q: %v", resource, err))
continue
}
monitor, err := gb.controllerFor(resource, kind)
c, err := gb.controllerFor(resource, kind)
if err != nil {
return err
errs = append(errs, fmt.Errorf("couldn't start monitor for resource %q: %v", resource, err))
continue
}
gb.monitors = append(gb.monitors, monitor)
current[resource] = &monitor{controller: c}
added++
}
return nil
gb.monitors = current
for _, monitor := range toRemove {
if monitor.stopCh != nil {
close(monitor.stopCh)
}
}
glog.V(4).Infof("synced monitors; added %d, kept %d, removed %d", added, kept, len(toRemove))
// NewAggregate returns nil if errs is 0-length
return utilerrors.NewAggregate(errs)
}
func (gb *GraphBuilder) HasSynced() bool {
// startMonitors ensures the current set of monitors are running. Any newly
// started monitors will also cause shared informers to be started.
//
// If called before Run, startMonitors does nothing (as there is no stop channel
// to support monitor/informer execution).
func (gb *GraphBuilder) startMonitors() {
gb.monitorLock.Lock()
defer gb.monitorLock.Unlock()
if gb.stopCh == nil {
return
}
monitors := gb.monitors
started := 0
for _, monitor := range monitors {
if monitor.stopCh == nil {
monitor.stopCh = make(chan struct{})
gb.sharedInformers.Start(gb.stopCh)
go monitor.Run()
started++
}
}
glog.V(4).Infof("started %d new monitors, %d currently running", started, len(monitors))
}
// IsSynced returns true if any monitors exist AND all those monitors'
// controllers HasSynced functions return true. This means IsSynced could return
// true at one time, and then later return false if all monitors were
// reconstructed.
func (gb *GraphBuilder) IsSynced() bool {
gb.monitorLock.Lock()
defer gb.monitorLock.Unlock()
if len(gb.monitors) == 0 {
return false
}
for _, monitor := range gb.monitors {
if !monitor.HasSynced() {
if !monitor.controller.HasSynced() {
return false
}
}
return true
}
// Run sets the stop channel and starts monitor execution until stopCh is
// closed. Any running monitors will be stopped before Run returns.
func (gb *GraphBuilder) Run(stopCh <-chan struct{}) {
for _, monitor := range gb.monitors {
go monitor.Run(stopCh)
}
go wait.Until(gb.runProcessGraphChanges, 1*time.Second, stopCh)
glog.Infof("GraphBuilder running")
defer glog.Infof("GraphBuilder stopping")
// set this so that we can use it if we need to start new shared informers
// Set up the stop channel.
gb.monitorLock.Lock()
gb.stopCh = stopCh
gb.monitorLock.Unlock()
// Start monitors and begin change processing until the stop channel is
// closed.
gb.startMonitors()
wait.Until(gb.runProcessGraphChanges, 1*time.Second, stopCh)
// Stop any running monitors.
gb.monitorLock.Lock()
defer gb.monitorLock.Unlock()
monitors := gb.monitors
stopped := 0
for _, monitor := range monitors {
if monitor.stopCh != nil {
stopped++
close(monitor.stopCh)
}
}
glog.Infof("stopped %d of %d monitors", stopped, len(monitors))
}
var ignoredResources = map[schema.GroupResource]struct{}{