diff --git a/cmd/kube-controller-manager/app/BUILD b/cmd/kube-controller-manager/app/BUILD index 330bf24d0a7..9c3b6ef699a 100644 --- a/cmd/kube-controller-manager/app/BUILD +++ b/cmd/kube-controller-manager/app/BUILD @@ -60,7 +60,6 @@ go_library( "//pkg/controller/disruption:go_default_library", "//pkg/controller/endpoint:go_default_library", "//pkg/controller/garbagecollector:go_default_library", - "//pkg/controller/garbagecollector/metaonly:go_default_library", "//pkg/controller/job:go_default_library", "//pkg/controller/namespace:go_default_library", "//pkg/controller/node:go_default_library", @@ -109,14 +108,15 @@ go_library( "//vendor/github.com/spf13/cobra:go_default_library", "//vendor/github.com/spf13/pflag:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/runtime/serializer:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/apiserver/pkg/server/healthz:go_default_library", "//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library", "//vendor/k8s.io/client-go/discovery:go_default_library", + "//vendor/k8s.io/client-go/discovery/cached:go_default_library", "//vendor/k8s.io/client-go/dynamic:go_default_library", "//vendor/k8s.io/client-go/informers:go_default_library", "//vendor/k8s.io/client-go/kubernetes:go_default_library", diff --git a/cmd/kube-controller-manager/app/core.go b/cmd/kube-controller-manager/app/core.go index afe580f0863..5dd170140af 100644 --- a/cmd/kube-controller-manager/app/core.go +++ b/cmd/kube-controller-manager/app/core.go @@ -29,18 +29,17 @@ import ( "github.com/golang/glog" "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/runtime/serializer" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/discovery" + cacheddiscovery "k8s.io/client-go/discovery/cached" "k8s.io/client-go/dynamic" clientset "k8s.io/client-go/kubernetes" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/controller" endpointcontroller "k8s.io/kubernetes/pkg/controller/endpoint" "k8s.io/kubernetes/pkg/controller/garbagecollector" - "k8s.io/kubernetes/pkg/controller/garbagecollector/metaonly" namespacecontroller "k8s.io/kubernetes/pkg/controller/namespace" nodecontroller "k8s.io/kubernetes/pkg/controller/node" "k8s.io/kubernetes/pkg/controller/podgc" @@ -297,47 +296,50 @@ func startGarbageCollectorController(ctx ControllerContext) (bool, error) { return false, nil } - // TODO: should use a dynamic RESTMapper built from the discovery results. - restMapper := api.Registry.RESTMapper() - gcClientset := ctx.ClientBuilder.ClientOrDie("generic-garbage-collector") - preferredResources, err := gcClientset.Discovery().ServerPreferredResources() - if err != nil { - utilruntime.HandleError(fmt.Errorf("unable to get all supported resources from server: %v", err)) - } - if len(preferredResources) == 0 { - return true, fmt.Errorf("unable to get any supported resources from server: %v", err) - } - deletableResources := discovery.FilteredBy(discovery.SupportsAllVerbs{Verbs: []string{"get", "list", "watch", "patch", "update", "delete"}}, preferredResources) - deletableGroupVersionResources, err := discovery.GroupVersionResources(deletableResources) - if err != nil { - return true, fmt.Errorf("Failed to parse resources from server: %v", err) - } + + // Use a discovery client capable of being refreshed. + discoveryClient := cacheddiscovery.NewMemCacheClient(gcClientset.Discovery()) + restMapper := discovery.NewDeferredDiscoveryRESTMapper(discoveryClient, meta.InterfacesForUnstructured) + restMapper.Reset() config := ctx.ClientBuilder.ConfigOrDie("generic-garbage-collector") - config.ContentConfig.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: metaonly.NewMetadataCodecFactory()} - metaOnlyClientPool := dynamic.NewClientPool(config, restMapper, dynamic.LegacyAPIPathResolverFunc) config.ContentConfig = dynamic.ContentConfig() + // TODO: Make NewMetadataCodecFactory support arbitrary (non-compiled) + // resource types. Otherwise we'll be storing full Unstructured data in our + // caches for custom resources. Consider porting it to work with + // metav1alpha1.PartialObjectMetadata. + metaOnlyClientPool := dynamic.NewClientPool(config, restMapper, dynamic.LegacyAPIPathResolverFunc) clientPool := dynamic.NewClientPool(config, restMapper, dynamic.LegacyAPIPathResolverFunc) + // Get an initial set of deletable resources to prime the garbage collector. + deletableResources, err := garbagecollector.GetDeletableResources(discoveryClient) + if err != nil { + return true, err + } ignoredResources := make(map[schema.GroupResource]struct{}) for _, r := range ctx.Options.GCIgnoredResources { ignoredResources[schema.GroupResource{Group: r.Group, Resource: r.Resource}] = struct{}{} } - garbageCollector, err := garbagecollector.NewGarbageCollector( metaOnlyClientPool, clientPool, restMapper, - deletableGroupVersionResources, + deletableResources, ignoredResources, ctx.InformerFactory, ) if err != nil { return true, fmt.Errorf("Failed to start the generic garbage collector: %v", err) } + + // Start the garbage collector. workers := int(ctx.Options.ConcurrentGCSyncs) go garbageCollector.Run(workers, ctx.Stop) + // Periodically refresh the RESTMapper with new discovery information and sync + // the garbage collector. + go garbageCollector.Sync(restMapper, discoveryClient, 30*time.Second, ctx.Stop) + return true, nil } diff --git a/hack/make-rules/test-cmd-util.sh b/hack/make-rules/test-cmd-util.sh index b3da28844e8..7490b6525f3 100644 --- a/hack/make-rules/test-cmd-util.sh +++ b/hack/make-rules/test-cmd-util.sh @@ -1647,7 +1647,7 @@ run_non_native_resource_tests() { kubectl "${kube_flags[@]}" delete bars test --cascade=false # Make sure it's gone - kube::test::get_object_assert bars "{{range.items}}{{$id_field}}:{{end}}" '' + kube::test::wait_object_assert bars "{{range.items}}{{$id_field}}:{{end}}" '' # Test that we can create single item via apply kubectl "${kube_flags[@]}" apply -f hack/testdata/TPR/foo.yaml diff --git a/hack/make-rules/test-integration.sh b/hack/make-rules/test-integration.sh index dbe7efc3616..3bf6023df83 100755 --- a/hack/make-rules/test-integration.sh +++ b/hack/make-rules/test-integration.sh @@ -71,7 +71,7 @@ runTests() { make -C "${KUBE_ROOT}" test \ WHAT="${WHAT:-$(kube::test::find_integration_test_dirs | paste -sd' ' -)}" \ GOFLAGS="${GOFLAGS:-}" \ - KUBE_TEST_ARGS="${KUBE_TEST_ARGS:-} ${SHORT:--short=true} --vmodule=garbage*collector*=6 --alsologtostderr=true" \ + KUBE_TEST_ARGS="${KUBE_TEST_ARGS:-} ${SHORT:--short=true} --alsologtostderr=true" \ KUBE_RACE="" \ KUBE_TIMEOUT="${KUBE_TIMEOUT}" \ KUBE_TEST_API_VERSIONS="$1" diff --git a/pkg/controller/garbagecollector/BUILD b/pkg/controller/garbagecollector/BUILD index 51568904605..9e83bf044bb 100644 --- a/pkg/controller/garbagecollector/BUILD +++ b/pkg/controller/garbagecollector/BUILD @@ -42,6 +42,7 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/apimachinery/pkg/watch:go_default_library", + "//vendor/k8s.io/client-go/discovery:go_default_library", "//vendor/k8s.io/client-go/dynamic:go_default_library", "//vendor/k8s.io/client-go/informers:go_default_library", "//vendor/k8s.io/client-go/kubernetes:go_default_library", diff --git a/pkg/controller/garbagecollector/errors.go b/pkg/controller/garbagecollector/errors.go index e73cb6f0be3..cb13b7d40c9 100644 --- a/pkg/controller/garbagecollector/errors.go +++ b/pkg/controller/garbagecollector/errors.go @@ -20,8 +20,6 @@ import ( "fmt" ) -const nonCoreMessage = `If %s is a non-core resource (e.g. thirdparty resource, custom resource from aggregated apiserver), please note that the garbage collector doesn't support non-core resources yet. Once they are supported, object with ownerReferences referring non-existing non-core objects will be deleted by the garbage collector.` - type restMappingError struct { kind string version string @@ -36,7 +34,6 @@ func (r *restMappingError) Error() string { func (r *restMappingError) Message() string { versionKind := fmt.Sprintf("%s/%s", r.version, r.kind) errMsg := fmt.Sprintf("unable to get REST mapping for %s. ", versionKind) - errMsg += fmt.Sprintf(nonCoreMessage, versionKind) errMsg += fmt.Sprintf(" If %s is an invalid resource, then you should manually remove ownerReferences that refer %s objects.", versionKind, versionKind) return errMsg } diff --git a/pkg/controller/garbagecollector/garbagecollector.go b/pkg/controller/garbagecollector/garbagecollector.go index 79aed2ff331..96355c2b7ac 100644 --- a/pkg/controller/garbagecollector/garbagecollector.go +++ b/pkg/controller/garbagecollector/garbagecollector.go @@ -32,6 +32,7 @@ import ( utilerrors "k8s.io/apimachinery/pkg/util/errors" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/discovery" "k8s.io/client-go/dynamic" "k8s.io/client-go/informers" "k8s.io/client-go/util/workqueue" @@ -73,6 +74,8 @@ type GarbageCollector struct { // GC caches the owners that do not exist according to the API server. absentOwnerCache *UIDCache sharedInformers informers.SharedInformerFactory + + workerLock sync.RWMutex } func NewGarbageCollector( @@ -108,14 +111,24 @@ func NewGarbageCollector( sharedInformers: sharedInformers, ignoredResources: ignoredResources, } - if err := gb.monitorsForResources(deletableResources); err != nil { - return nil, err + if err := gb.syncMonitors(deletableResources); err != nil { + utilruntime.HandleError(fmt.Errorf("failed to sync all monitors: %v", err)) } gc.dependencyGraphBuilder = gb return gc, nil } +// resyncMonitors starts or stops resource monitors as needed to ensure that all +// (and only) those resources present in the map are monitored. +func (gc *GarbageCollector) resyncMonitors(deletableResources map[schema.GroupVersionResource]struct{}) error { + if err := gc.dependencyGraphBuilder.syncMonitors(deletableResources); err != nil { + return err + } + gc.dependencyGraphBuilder.startMonitors() + return nil +} + func (gc *GarbageCollector) Run(workers int, stopCh <-chan struct{}) { defer utilruntime.HandleCrash() defer gc.attemptToDelete.ShutDown() @@ -125,9 +138,9 @@ func (gc *GarbageCollector) Run(workers int, stopCh <-chan struct{}) { glog.Infof("Starting garbage collector controller") defer glog.Infof("Shutting down garbage collector controller") - gc.dependencyGraphBuilder.Run(stopCh) + go gc.dependencyGraphBuilder.Run(stopCh) - if !controller.WaitForCacheSync("garbage collector", stopCh, gc.dependencyGraphBuilder.HasSynced) { + if !controller.WaitForCacheSync("garbage collector", stopCh, gc.dependencyGraphBuilder.IsSynced) { return } @@ -142,8 +155,43 @@ func (gc *GarbageCollector) Run(workers int, stopCh <-chan struct{}) { <-stopCh } -func (gc *GarbageCollector) HasSynced() bool { - return gc.dependencyGraphBuilder.HasSynced() +// resettableRESTMapper is a RESTMapper which is capable of resetting itself +// from discovery. +type resettableRESTMapper interface { + meta.RESTMapper + Reset() +} + +// Sync periodically resyncs the garbage collector monitors with resources +// returned found via the discoveryClient. Sync blocks, continuing to sync until +// a message is received on stopCh. +// +// The discoveryClient should be the same client which underlies restMapper. +func (gc *GarbageCollector) Sync(restMapper resettableRESTMapper, discoveryClient discovery.DiscoveryInterface, period time.Duration, stopCh <-chan struct{}) { + wait.Until(func() { + // Ensure workers are paused to avoid processing events before informers + // have resynced. + gc.workerLock.Lock() + defer gc.workerLock.Unlock() + + restMapper.Reset() + deletableResources, err := GetDeletableResources(discoveryClient) + if err != nil { + utilruntime.HandleError(err) + return + } + if err := gc.resyncMonitors(deletableResources); err != nil { + utilruntime.HandleError(fmt.Errorf("failed to sync resource monitors: %v", err)) + return + } + if !controller.WaitForCacheSync("garbage collector", stopCh, gc.dependencyGraphBuilder.IsSynced) { + utilruntime.HandleError(fmt.Errorf("timed out waiting for dependency graph builder sync during GC sync")) + } + }, period, stopCh) +} + +func (gc *GarbageCollector) IsSynced() bool { + return gc.dependencyGraphBuilder.IsSynced() } func (gc *GarbageCollector) runAttemptToDeleteWorker() { @@ -153,6 +201,8 @@ func (gc *GarbageCollector) runAttemptToDeleteWorker() { func (gc *GarbageCollector) attemptToDeleteWorker() bool { item, quit := gc.attemptToDelete.Get() + gc.workerLock.RLock() + defer gc.workerLock.RUnlock() if quit { return false } @@ -164,13 +214,18 @@ func (gc *GarbageCollector) attemptToDeleteWorker() bool { } err := gc.attemptToDeleteItem(n) if err != nil { - // TODO: remove this block when gc starts using dynamic RESTMapper. - if restMappingError, ok := err.(*restMappingError); ok { - utilruntime.HandleError(fmt.Errorf("Ignore syncing item %#v: %s", n, restMappingError.Message())) - // The RESTMapper is static, so no need to retry, otherwise we'll get the same error. - return true + if _, ok := err.(*restMappingError); ok { + // There are at least two ways this can happen: + // 1. The reference is to an object of a custom type that has not yet been + // recognized by gc.restMapper (this is a transient error). + // 2. The reference is to an invalid group/version. We don't currently + // have a way to distinguish this from a valid type we will recognize + // after the next discovery sync. + // For now, record the error and retry. + glog.V(5).Infof("error syncing item %s: %v", n, err) + } else { + utilruntime.HandleError(fmt.Errorf("error syncing item %s: %v", n, err)) } - utilruntime.HandleError(fmt.Errorf("Error syncing item %#v: %v", n, err)) // retry if garbage collection of an object failed. gc.attemptToDelete.AddRateLimited(item) } @@ -454,6 +509,8 @@ func (gc *GarbageCollector) runAttemptToOrphanWorker() { // these steps fail. func (gc *GarbageCollector) attemptToOrphanWorker() bool { item, quit := gc.attemptToOrphan.Get() + gc.workerLock.RLock() + defer gc.workerLock.RUnlock() if quit { return false } @@ -498,3 +555,19 @@ func (gc *GarbageCollector) GraphHasUID(UIDs []types.UID) bool { } return false } + +// GetDeletableResources returns all resources from discoveryClient that the +// garbage collector should recognize and work with. More specifically, all +// preferred resources which support the 'delete' verb. +func GetDeletableResources(discoveryClient discovery.DiscoveryInterface) (map[schema.GroupVersionResource]struct{}, error) { + preferredResources, err := discoveryClient.ServerPreferredResources() + if err != nil { + return nil, fmt.Errorf("failed to get supported resources from server: %v", err) + } + deletableResources := discovery.FilteredBy(discovery.SupportsAllVerbs{Verbs: []string{"delete"}}, preferredResources) + deletableGroupVersionResources, err := discovery.GroupVersionResources(deletableResources) + if err != nil { + return nil, fmt.Errorf("Failed to parse resources from server: %v", err) + } + return deletableGroupVersionResources, nil +} diff --git a/pkg/controller/garbagecollector/garbagecollector_test.go b/pkg/controller/garbagecollector/garbagecollector_test.go index db5280274a5..37fcd20069a 100644 --- a/pkg/controller/garbagecollector/garbagecollector_test.go +++ b/pkg/controller/garbagecollector/garbagecollector_test.go @@ -46,25 +46,62 @@ import ( "k8s.io/kubernetes/pkg/controller/garbagecollector/metaonly" ) -func TestNewGarbageCollector(t *testing.T) { +func TestGarbageCollectorConstruction(t *testing.T) { config := &restclient.Config{} config.ContentConfig.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: metaonly.NewMetadataCodecFactory()} - metaOnlyClientPool := dynamic.NewClientPool(config, api.Registry.RESTMapper(), dynamic.LegacyAPIPathResolverFunc) + tweakableRM := meta.NewDefaultRESTMapper(nil, nil) + rm := meta.MultiRESTMapper{tweakableRM, api.Registry.RESTMapper()} + metaOnlyClientPool := dynamic.NewClientPool(config, rm, dynamic.LegacyAPIPathResolverFunc) config.ContentConfig.NegotiatedSerializer = nil - clientPool := dynamic.NewClientPool(config, api.Registry.RESTMapper(), dynamic.LegacyAPIPathResolverFunc) + clientPool := dynamic.NewClientPool(config, rm, dynamic.LegacyAPIPathResolverFunc) podResource := map[schema.GroupVersionResource]struct{}{ {Version: "v1", Resource: "pods"}: {}, - // no monitor will be constructed for non-core resource, the GC construction will not fail. + } + twoResources := map[schema.GroupVersionResource]struct{}{ + {Version: "v1", Resource: "pods"}: {}, {Group: "tpr.io", Version: "v1", Resource: "unknown"}: {}, } - client := fake.NewSimpleClientset() sharedInformers := informers.NewSharedInformerFactory(client, 0) - gc, err := NewGarbageCollector(metaOnlyClientPool, clientPool, api.Registry.RESTMapper(), podResource, ignoredResources, sharedInformers) + + // No monitor will be constructed for the non-core resource, but the GC + // construction will not fail. + gc, err := NewGarbageCollector(metaOnlyClientPool, clientPool, rm, twoResources, map[schema.GroupResource]struct{}{}, sharedInformers) if err != nil { t.Fatal(err) } assert.Equal(t, 1, len(gc.dependencyGraphBuilder.monitors)) + + // Make sure resource monitor syncing creates and stops resource monitors. + tweakableRM.Add(schema.GroupVersionKind{Group: "tpr.io", Version: "v1", Kind: "unknown"}, nil) + err = gc.resyncMonitors(twoResources) + if err != nil { + t.Errorf("Failed adding a monitor: %v", err) + } + assert.Equal(t, 2, len(gc.dependencyGraphBuilder.monitors)) + + err = gc.resyncMonitors(podResource) + if err != nil { + t.Errorf("Failed removing a monitor: %v", err) + } + assert.Equal(t, 1, len(gc.dependencyGraphBuilder.monitors)) + + // Make sure the syncing mechanism also works after Run() has been called + stopCh := make(chan struct{}) + defer close(stopCh) + go gc.Run(1, stopCh) + + err = gc.resyncMonitors(twoResources) + if err != nil { + t.Errorf("Failed adding a monitor: %v", err) + } + assert.Equal(t, 2, len(gc.dependencyGraphBuilder.monitors)) + + err = gc.resyncMonitors(podResource) + if err != nil { + t.Errorf("Failed removing a monitor: %v", err) + } + assert.Equal(t, 1, len(gc.dependencyGraphBuilder.monitors)) } // fakeAction records information about requests to aid in testing. diff --git a/pkg/controller/garbagecollector/graph.go b/pkg/controller/garbagecollector/graph.go index 5ced32b7a97..282256cbdba 100644 --- a/pkg/controller/garbagecollector/graph.go +++ b/pkg/controller/garbagecollector/graph.go @@ -134,6 +134,14 @@ func (n *node) blockingDependents() []*node { return ret } +// String renders node as a string using fmt. Acquires a read lock to ensure the +// reflective dump of dependents doesn't race with any concurrent writes. +func (n *node) String() string { + n.dependentsLock.RLock() + defer n.dependentsLock.RUnlock() + return fmt.Sprintf("%#v", n) +} + type concurrentUIDToNode struct { uidToNodeLock sync.RWMutex uidToNode map[types.UID]*node diff --git a/pkg/controller/garbagecollector/graph_builder.go b/pkg/controller/garbagecollector/graph_builder.go index 726d0224a3a..babe81bf725 100644 --- a/pkg/controller/garbagecollector/graph_builder.go +++ b/pkg/controller/garbagecollector/graph_builder.go @@ -19,6 +19,7 @@ package garbagecollector import ( "fmt" "reflect" + "sync" "time" "github.com/golang/glog" @@ -27,6 +28,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + utilerrors "k8s.io/apimachinery/pkg/util/errors" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" @@ -71,9 +73,18 @@ type event struct { // items to the attemptToDelete and attemptToOrphan. type GraphBuilder struct { restMapper meta.RESTMapper + // each monitor list/watches a resource, the results are funneled to the // dependencyGraphBuilder - monitors []cache.Controller + monitors monitors + monitorLock sync.Mutex + // stopCh drives shutdown. If it is nil, it indicates that Run() has not been + // called yet. If it is non-nil, then when closed it indicates everything + // should shut down. + // + // This channel is also protected by monitorLock. + stopCh <-chan struct{} + // metaOnlyClientPool uses a special codec, which removes fields except for // apiVersion, kind, and metadata during decoding. metaOnlyClientPool dynamic.ClientPool @@ -93,10 +104,26 @@ type GraphBuilder struct { // be non-existent are added to the cached. absentOwnerCache *UIDCache sharedInformers informers.SharedInformerFactory - stopCh <-chan struct{} ignoredResources map[schema.GroupResource]struct{} } +// monitor runs a Controller with a local stop channel. +type monitor struct { + controller cache.Controller + + // stopCh stops Controller. If stopCh is nil, the monitor is considered to be + // not yet started. + stopCh chan struct{} +} + +// Run is intended to be called in a goroutine. Multiple calls of this is an +// error. +func (m *monitor) Run() { + m.controller.Run(m.stopCh) +} + +type monitors map[schema.GroupVersionResource]*monitor + func listWatcher(client dynamic.Interface, resource schema.GroupVersionResource) *cache.ListWatch { return &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { @@ -157,19 +184,11 @@ func (gb *GraphBuilder) controllerFor(resource schema.GroupVersionResource, kind gb.graphChanges.Add(event) }, } - shared, err := gb.sharedInformers.ForResource(resource) if err == nil { glog.V(4).Infof("using a shared informer for resource %q, kind %q", resource.String(), kind.String()) // need to clone because it's from a shared cache shared.Informer().AddEventHandlerWithResyncPeriod(handlers, ResourceResyncTime) - if gb.stopCh != nil { - // if gb.stopCh is set, it means we've already gotten past the initial gb.Run() call, so this - // means we've re-loaded and re-read discovery and we are adding a new monitor for a - // previously unseen resource, so we need to call Start on the shared informers again (this - // will only start those shared informers that have not yet been started). - go gb.sharedInformers.Start(gb.stopCh) - } return shared.Informer().GetController(), nil } else { glog.V(4).Infof("unable to use a shared informer for resource %q, kind %q: %v", resource.String(), kind.String(), err) @@ -181,6 +200,7 @@ func (gb *GraphBuilder) controllerFor(resource schema.GroupVersionResource, kind if err != nil { return nil, err } + // TODO: since the gv is never unregistered, isn't this a memory leak? gb.registeredRateLimiterForControllers.registerIfNotPresent(resource.GroupVersion(), client, "garbage_collector_monitoring") _, monitor := cache.NewInformer( listWatcher(client, resource), @@ -192,44 +212,134 @@ func (gb *GraphBuilder) controllerFor(resource schema.GroupVersionResource, kind return monitor, nil } -func (gb *GraphBuilder) monitorsForResources(resources map[schema.GroupVersionResource]struct{}) error { +// syncMonitors rebuilds the monitor set according to the supplied resources, +// creating or deleting monitors as necessary. It will return any error +// encountered, but will make an attempt to create a monitor for each resource +// instead of immediately exiting on an error. It may be called before or after +// Run. Monitors are NOT started as part of the sync. To ensure all existing +// monitors are started, call startMonitors. +func (gb *GraphBuilder) syncMonitors(resources map[schema.GroupVersionResource]struct{}) error { + gb.monitorLock.Lock() + defer gb.monitorLock.Unlock() + + toRemove := gb.monitors + if toRemove == nil { + toRemove = monitors{} + } + current := monitors{} + errs := []error{} + kept := 0 + added := 0 for resource := range resources { - if _, ok := gb.ignoredResources[resource.GroupResource()]; ok { - glog.V(5).Infof("ignore resource %#v", resource) + if _, ok := ignoredResources[resource.GroupResource()]; ok { + continue + } + if m, ok := toRemove[resource]; ok { + current[resource] = m + delete(toRemove, resource) + kept++ continue } kind, err := gb.restMapper.KindFor(resource) if err != nil { - nonCoreMsg := fmt.Sprintf(nonCoreMessage, resource) - utilruntime.HandleError(fmt.Errorf("%v. %s", err, nonCoreMsg)) + errs = append(errs, fmt.Errorf("couldn't look up resource %q: %v", resource, err)) continue } - monitor, err := gb.controllerFor(resource, kind) + c, err := gb.controllerFor(resource, kind) if err != nil { - return err + errs = append(errs, fmt.Errorf("couldn't start monitor for resource %q: %v", resource, err)) + continue } - gb.monitors = append(gb.monitors, monitor) + current[resource] = &monitor{controller: c} + added++ } - return nil + gb.monitors = current + + for _, monitor := range toRemove { + if monitor.stopCh != nil { + close(monitor.stopCh) + } + } + + glog.V(4).Infof("synced monitors; added %d, kept %d, removed %d", added, kept, len(toRemove)) + // NewAggregate returns nil if errs is 0-length + return utilerrors.NewAggregate(errs) } -func (gb *GraphBuilder) HasSynced() bool { +// startMonitors ensures the current set of monitors are running. Any newly +// started monitors will also cause shared informers to be started. +// +// If called before Run, startMonitors does nothing (as there is no stop channel +// to support monitor/informer execution). +func (gb *GraphBuilder) startMonitors() { + gb.monitorLock.Lock() + defer gb.monitorLock.Unlock() + + if gb.stopCh == nil { + return + } + + monitors := gb.monitors + started := 0 + for _, monitor := range monitors { + if monitor.stopCh == nil { + monitor.stopCh = make(chan struct{}) + gb.sharedInformers.Start(gb.stopCh) + go monitor.Run() + started++ + } + } + glog.V(4).Infof("started %d new monitors, %d currently running", started, len(monitors)) +} + +// IsSynced returns true if any monitors exist AND all those monitors' +// controllers HasSynced functions return true. This means IsSynced could return +// true at one time, and then later return false if all monitors were +// reconstructed. +func (gb *GraphBuilder) IsSynced() bool { + gb.monitorLock.Lock() + defer gb.monitorLock.Unlock() + + if len(gb.monitors) == 0 { + return false + } + for _, monitor := range gb.monitors { - if !monitor.HasSynced() { + if !monitor.controller.HasSynced() { return false } } return true } +// Run sets the stop channel and starts monitor execution until stopCh is +// closed. Any running monitors will be stopped before Run returns. func (gb *GraphBuilder) Run(stopCh <-chan struct{}) { - for _, monitor := range gb.monitors { - go monitor.Run(stopCh) - } - go wait.Until(gb.runProcessGraphChanges, 1*time.Second, stopCh) + glog.Infof("GraphBuilder running") + defer glog.Infof("GraphBuilder stopping") - // set this so that we can use it if we need to start new shared informers + // Set up the stop channel. + gb.monitorLock.Lock() gb.stopCh = stopCh + gb.monitorLock.Unlock() + + // Start monitors and begin change processing until the stop channel is + // closed. + gb.startMonitors() + wait.Until(gb.runProcessGraphChanges, 1*time.Second, stopCh) + + // Stop any running monitors. + gb.monitorLock.Lock() + defer gb.monitorLock.Unlock() + monitors := gb.monitors + stopped := 0 + for _, monitor := range monitors { + if monitor.stopCh != nil { + stopped++ + close(monitor.stopCh) + } + } + glog.Infof("stopped %d of %d monitors", stopped, len(monitors)) } var ignoredResources = map[schema.GroupResource]struct{}{ diff --git a/staging/src/k8s.io/apiextensions-apiserver/pkg/cmd/server/start.go b/staging/src/k8s.io/apiextensions-apiserver/pkg/cmd/server/start.go index b6fb709488f..a54bcd4c058 100644 --- a/staging/src/k8s.io/apiextensions-apiserver/pkg/cmd/server/start.go +++ b/staging/src/k8s.io/apiextensions-apiserver/pkg/cmd/server/start.go @@ -105,13 +105,11 @@ func (o CustomResourceDefinitionsServerOptions) Config() (*apiserver.Config, err func NewCRDRESTOptionsGetter(etcdOptions genericoptions.EtcdOptions) genericregistry.RESTOptionsGetter { ret := apiserver.CRDRESTOptionsGetter{ - StorageConfig: etcdOptions.StorageConfig, - StoragePrefix: etcdOptions.StorageConfig.Prefix, - EnableWatchCache: etcdOptions.EnableWatchCache, - DefaultWatchCacheSize: etcdOptions.DefaultWatchCacheSize, - // garbage collection for custom resources is forced off until GC works with CRs. - // When GC is enabled, this turns back into etcdOptions.EnableGarbageCollection - EnableGarbageCollection: false, + StorageConfig: etcdOptions.StorageConfig, + StoragePrefix: etcdOptions.StorageConfig.Prefix, + EnableWatchCache: etcdOptions.EnableWatchCache, + DefaultWatchCacheSize: etcdOptions.DefaultWatchCacheSize, + EnableGarbageCollection: etcdOptions.EnableGarbageCollection, DeleteCollectionWorkers: etcdOptions.DeleteCollectionWorkers, } ret.StorageConfig.Codec = unstructured.UnstructuredJSONScheme diff --git a/test/e2e/apimachinery/BUILD b/test/e2e/apimachinery/BUILD index 2ace019073f..31ef898ed15 100644 --- a/test/e2e/apimachinery/BUILD +++ b/test/e2e/apimachinery/BUILD @@ -20,6 +20,7 @@ go_library( ], tags = ["automanaged"], deps = [ + "//pkg/api:go_default_library", "//pkg/api/v1/pod:go_default_library", "//pkg/client/retry:go_default_library", "//pkg/controller:go_default_library", @@ -39,11 +40,13 @@ go_library( "//vendor/k8s.io/apiextensions-apiserver/test/integration/testserver:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1alpha1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/intstr:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//vendor/k8s.io/apiserver/pkg/storage/names:go_default_library", "//vendor/k8s.io/client-go/kubernetes:go_default_library", ], ) diff --git a/test/e2e/apimachinery/garbage_collector.go b/test/e2e/apimachinery/garbage_collector.go index 55c2b4f9ae6..f3d6f3b70f8 100644 --- a/test/e2e/apimachinery/garbage_collector.go +++ b/test/e2e/apimachinery/garbage_collector.go @@ -22,11 +22,17 @@ import ( "k8s.io/api/core/v1" v1beta1 "k8s.io/api/extensions/v1beta1" + apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" + apiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" + apiextensionstestserver "k8s.io/apiextensions-apiserver/test/integration/testserver" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apiserver/pkg/storage/names" clientset "k8s.io/client-go/kubernetes" + "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/test/e2e/framework" "k8s.io/kubernetes/test/e2e/metrics" @@ -535,7 +541,11 @@ var _ = SIGDescribe("Garbage collector", func() { } By("wait for the rc to be deleted") // default client QPS is 20, deleting each pod requires 2 requests, so 30s should be enough - if err := wait.Poll(1*time.Second, 30*time.Second, func() (bool, error) { + // TODO: 30s is enough assuming immediate processing of dependents following + // owner deletion, but in practice there can be a long delay between owner + // deletion and dependent deletion processing. For now, increase the timeout + // and investigate the processing delay. + if err := wait.Poll(1*time.Second, 60*time.Second, func() (bool, error) { _, err := rcClient.Get(rc.Name, metav1.GetOptions{}) if err == nil { pods, _ := podClient.List(metav1.ListOptions{}) @@ -737,4 +747,106 @@ var _ = SIGDescribe("Garbage collector", func() { framework.Failf("failed to wait for all pods to be deleted: %v", err) } }) + + It("should support cascading deletion of custom resources", func() { + config, err := framework.LoadConfig() + if err != nil { + framework.Failf("failed to load config: %v", err) + } + + apiExtensionClient, err := apiextensionsclientset.NewForConfig(config) + if err != nil { + framework.Failf("failed to initialize apiExtensionClient: %v", err) + } + + // Create a random custom resource definition and ensure it's available for + // use. + definition := apiextensionstestserver.NewRandomNameCustomResourceDefinition(apiextensionsv1beta1.ClusterScoped) + defer func() { + err = apiextensionstestserver.DeleteCustomResourceDefinition(definition, apiExtensionClient) + if err != nil && !errors.IsNotFound(err) { + framework.Failf("failed to delete CustomResourceDefinition: %v", err) + } + }() + client, err := apiextensionstestserver.CreateNewCustomResourceDefinition(definition, apiExtensionClient, f.ClientPool) + if err != nil { + framework.Failf("failed to create CustomResourceDefinition: %v", err) + } + + // Get a client for the custom resource. + resourceClient := client.Resource(&metav1.APIResource{ + Name: definition.Spec.Names.Plural, + Namespaced: false, + }, api.NamespaceNone) + apiVersion := definition.Spec.Group + "/" + definition.Spec.Version + + // Create a custom owner resource. + ownerName := names.SimpleNameGenerator.GenerateName("owner") + owner := &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": apiVersion, + "kind": definition.Spec.Names.Kind, + "metadata": map[string]interface{}{ + "name": ownerName, + }, + }, + } + persistedOwner, err := resourceClient.Create(owner) + if err != nil { + framework.Failf("failed to create owner resource %q: %v", ownerName, err) + } + framework.Logf("created owner resource %q", ownerName) + + // Create a custom dependent resource. + dependentName := names.SimpleNameGenerator.GenerateName("dependent") + dependent := &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": apiVersion, + "kind": definition.Spec.Names.Kind, + "metadata": map[string]interface{}{ + "name": dependentName, + "ownerReferences": []map[string]string{ + { + "uid": string(persistedOwner.GetUID()), + "apiVersion": apiVersion, + "kind": definition.Spec.Names.Kind, + "name": ownerName, + }, + }, + }, + }, + } + persistedDependent, err := resourceClient.Create(dependent) + if err != nil { + framework.Failf("failed to create dependent resource %q: %v", dependentName, err) + } + framework.Logf("created dependent resource %q", dependentName) + + // Delete the owner. + background := metav1.DeletePropagationBackground + err = resourceClient.Delete(ownerName, &metav1.DeleteOptions{PropagationPolicy: &background}) + if err != nil { + framework.Failf("failed to delete owner resource %q: %v", ownerName, err) + } + + // Ensure the dependent is deleted. + if err := wait.Poll(5*time.Second, 60*time.Second, func() (bool, error) { + _, err := resourceClient.Get(dependentName, metav1.GetOptions{}) + return errors.IsNotFound(err), nil + }); err != nil { + framework.Logf("owner: %#v", persistedOwner) + framework.Logf("dependent: %#v", persistedDependent) + framework.Failf("failed waiting for dependent resource %q to be deleted", dependentName) + } + + // Ensure the owner is deleted. + _, err = resourceClient.Get(ownerName, metav1.GetOptions{}) + if err == nil { + framework.Failf("expected owner resource %q to be deleted", ownerName) + } else { + if !errors.IsNotFound(err) { + framework.Failf("unexpected error getting owner resource %q: %v", ownerName, err) + } + } + }) }) diff --git a/test/integration/garbagecollector/BUILD b/test/integration/garbagecollector/BUILD index 36e7fcd12ff..cead86dd533 100644 --- a/test/integration/garbagecollector/BUILD +++ b/test/integration/garbagecollector/BUILD @@ -10,32 +10,32 @@ load( go_test( name = "go_default_test", size = "large", - srcs = [ - "garbage_collector_test.go", - "main_test.go", - ], + srcs = ["garbage_collector_test.go"], tags = [ "automanaged", "integration", ], deps = [ - "//pkg/api:go_default_library", + "//cmd/kube-apiserver/app/testing:go_default_library", "//pkg/controller/garbagecollector:go_default_library", - "//pkg/controller/garbagecollector/metaonly:go_default_library", "//test/integration:go_default_library", - "//test/integration/framework:go_default_library", - "//vendor/github.com/golang/glog:go_default_library", + "//vendor/github.com/coreos/pkg/capnslog:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", + "//vendor/k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1:go_default_library", + "//vendor/k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset:go_default_library", + "//vendor/k8s.io/apiextensions-apiserver/test/integration/testserver:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/runtime/serializer:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//vendor/k8s.io/apiserver/pkg/storage/names:go_default_library", "//vendor/k8s.io/client-go/discovery:go_default_library", + "//vendor/k8s.io/client-go/discovery/cached:go_default_library", "//vendor/k8s.io/client-go/dynamic:go_default_library", "//vendor/k8s.io/client-go/informers:go_default_library", "//vendor/k8s.io/client-go/kubernetes:go_default_library", - "//vendor/k8s.io/client-go/rest:go_default_library", "//vendor/k8s.io/client-go/tools/cache:go_default_library", ], ) diff --git a/test/integration/garbagecollector/garbage_collector_test.go b/test/integration/garbagecollector/garbage_collector_test.go index 9f82ac1f0b5..ee3f8389695 100644 --- a/test/integration/garbagecollector/garbage_collector_test.go +++ b/test/integration/garbagecollector/garbage_collector_test.go @@ -18,32 +18,34 @@ package garbagecollector import ( "fmt" - "net/http/httptest" "strconv" "strings" "sync" "testing" "time" - "github.com/golang/glog" - "k8s.io/api/core/v1" + apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" + apiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" + apiextensionstestserver "k8s.io/apiextensions-apiserver/test/integration/testserver" "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apiserver/pkg/storage/names" "k8s.io/client-go/discovery" + cacheddiscovery "k8s.io/client-go/discovery/cached" "k8s.io/client-go/dynamic" "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" - restclient "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" - "k8s.io/kubernetes/pkg/api" + apitesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" "k8s.io/kubernetes/pkg/controller/garbagecollector" - "k8s.io/kubernetes/pkg/controller/garbagecollector/metaonly" "k8s.io/kubernetes/test/integration" - "k8s.io/kubernetes/test/integration/framework" + + "github.com/coreos/pkg/capnslog" ) func getForegroundOptions() *metav1.DeleteOptions { @@ -124,63 +126,197 @@ func newOwnerRC(name, namespace string) *v1.ReplicationController { } } -func setup(t *testing.T, stop chan struct{}) (*httptest.Server, framework.CloseFunc, *garbagecollector.GarbageCollector, clientset.Interface) { - masterConfig := framework.NewIntegrationTestMasterConfig() - masterConfig.EnableCoreControllers = false - _, s, closeFn := framework.RunAMaster(masterConfig) +func newCRDInstance(definition *apiextensionsv1beta1.CustomResourceDefinition, namespace, name string) *unstructured.Unstructured { + return &unstructured.Unstructured{ + Object: map[string]interface{}{ + "kind": definition.Spec.Names.Kind, + "apiVersion": definition.Spec.Group + "/" + definition.Spec.Version, + "metadata": map[string]interface{}{ + "name": name, + "namespace": namespace, + }, + }, + } +} - clientSet, err := clientset.NewForConfig(&restclient.Config{Host: s.URL}) - if err != nil { - t.Fatalf("Error in create clientset: %v", err) +func newConfigMap(namespace, name string) *v1.ConfigMap { + return &v1.ConfigMap{ + TypeMeta: metav1.TypeMeta{ + Kind: "ConfigMap", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: name, + }, } - preferredResources, err := clientSet.Discovery().ServerPreferredResources() +} + +func link(t *testing.T, owner, dependent metav1.Object) { + ownerType, err := meta.TypeAccessor(owner) if err != nil { - t.Fatalf("Failed to get supported resources from server: %v", err) + t.Fatalf("failed to get type info for %#v: %v", owner, err) } - deletableResources := discovery.FilteredBy(discovery.SupportsAllVerbs{Verbs: []string{"delete"}}, preferredResources) - deletableGroupVersionResources, err := discovery.GroupVersionResources(deletableResources) + ref := metav1.OwnerReference{ + Kind: ownerType.GetKind(), + APIVersion: ownerType.GetAPIVersion(), + Name: owner.GetName(), + UID: owner.GetUID(), + } + dependent.SetOwnerReferences(append(dependent.GetOwnerReferences(), ref)) +} + +func createRandomCustomResourceDefinition( + t *testing.T, apiExtensionClient apiextensionsclientset.Interface, + clientPool dynamic.ClientPool, + namespace string, +) (*apiextensionsv1beta1.CustomResourceDefinition, dynamic.ResourceInterface) { + // Create a random custom resource definition and ensure it's available for + // use. + definition := apiextensionstestserver.NewRandomNameCustomResourceDefinition(apiextensionsv1beta1.NamespaceScoped) + + client, err := apiextensionstestserver.CreateNewCustomResourceDefinition(definition, apiExtensionClient, clientPool) if err != nil { - t.Fatalf("Failed to parse supported resources from server: %v", err) + t.Fatalf("failed to create CustomResourceDefinition: %v", err) } - config := &restclient.Config{Host: s.URL} - config.ContentConfig.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: metaonly.NewMetadataCodecFactory()} - metaOnlyClientPool := dynamic.NewClientPool(config, api.Registry.RESTMapper(), dynamic.LegacyAPIPathResolverFunc) - config.ContentConfig.NegotiatedSerializer = nil - clientPool := dynamic.NewClientPool(config, api.Registry.RESTMapper(), dynamic.LegacyAPIPathResolverFunc) + + // Get a client for the custom resource. + resourceClient := client.Resource(&metav1.APIResource{ + Name: definition.Spec.Names.Plural, + Namespaced: true, + }, namespace) + return definition, resourceClient +} + +type testContext struct { + tearDown func() + gc *garbagecollector.GarbageCollector + clientSet clientset.Interface + apiExtensionClient apiextensionsclientset.Interface + clientPool dynamic.ClientPool + startGC func(workers int) + // syncPeriod is how often the GC started with startGC will be resynced. + syncPeriod time.Duration +} + +// if workerCount > 0, will start the GC, otherwise it's up to the caller to Run() the GC. +func setup(t *testing.T, workerCount int) *testContext { + masterConfig, tearDownMaster := apitesting.StartTestServerOrDie(t) + + // TODO: Disable logging here until we resolve teardown issues which result in + // massive log spam. Another path forward would be to refactor + // StartTestServerOrDie to work with the etcd instance already started by the + // integration test scripts. + // See https://github.com/kubernetes/kubernetes/issues/49489. + repo, err := capnslog.GetRepoLogger("github.com/coreos/etcd") + if err != nil { + t.Fatalf("couldn't configure logging: %v", err) + } + repo.SetLogLevel(map[string]capnslog.LogLevel{ + "etcdserver/api/v3rpc": capnslog.CRITICAL, + }) + + clientSet, err := clientset.NewForConfig(masterConfig) + if err != nil { + t.Fatalf("error creating clientset: %v", err) + } + + // Helpful stuff for testing CRD. + apiExtensionClient, err := apiextensionsclientset.NewForConfig(masterConfig) + if err != nil { + t.Fatalf("error creating extension clientset: %v", err) + } + // CreateNewCustomResourceDefinition wants to use this namespace for verifying + // namespace-scoped CRD creation. + createNamespaceOrDie("aval", clientSet, t) + + discoveryClient := cacheddiscovery.NewMemCacheClient(clientSet.Discovery()) + restMapper := discovery.NewDeferredDiscoveryRESTMapper(discoveryClient, meta.InterfacesForUnstructured) + restMapper.Reset() + deletableResources, err := garbagecollector.GetDeletableResources(discoveryClient) + if err != nil { + t.Fatalf("unable to get deletable resources: %v", err) + } + config := *masterConfig + config.ContentConfig = dynamic.ContentConfig() + metaOnlyClientPool := dynamic.NewClientPool(&config, restMapper, dynamic.LegacyAPIPathResolverFunc) + clientPool := dynamic.NewClientPool(&config, restMapper, dynamic.LegacyAPIPathResolverFunc) sharedInformers := informers.NewSharedInformerFactory(clientSet, 0) gc, err := garbagecollector.NewGarbageCollector( metaOnlyClientPool, clientPool, - api.Registry.RESTMapper(), - deletableGroupVersionResources, + restMapper, + deletableResources, garbagecollector.DefaultIgnoredResources(), sharedInformers, ) if err != nil { - t.Fatalf("Failed to create garbage collector") + t.Fatalf("failed to create garbage collector: %v", err) } - go sharedInformers.Start(stop) + stopCh := make(chan struct{}) + tearDown := func() { + close(stopCh) + tearDownMaster() + repo.SetLogLevel(map[string]capnslog.LogLevel{ + "etcdserver/api/v3rpc": capnslog.ERROR, + }) + } + syncPeriod := 5 * time.Second + startGC := func(workers int) { + go gc.Run(workers, stopCh) + go gc.Sync(restMapper, discoveryClient, syncPeriod, stopCh) + } - return s, closeFn, gc, clientSet + if workerCount > 0 { + startGC(workerCount) + } + + return &testContext{ + tearDown: tearDown, + gc: gc, + clientSet: clientSet, + apiExtensionClient: apiExtensionClient, + clientPool: clientPool, + startGC: startGC, + syncPeriod: syncPeriod, + } +} + +func createNamespaceOrDie(name string, c clientset.Interface, t *testing.T) *v1.Namespace { + ns := &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: name}} + if _, err := c.Core().Namespaces().Create(ns); err != nil { + t.Fatalf("failed to create namespace: %v", err) + } + falseVar := false + _, err := c.Core().ServiceAccounts(ns.Name).Create(&v1.ServiceAccount{ + ObjectMeta: metav1.ObjectMeta{Name: "default"}, + AutomountServiceAccountToken: &falseVar, + }) + if err != nil { + t.Fatalf("failed to create service account: %v", err) + } + return ns +} + +func deleteNamespaceOrDie(name string, c clientset.Interface, t *testing.T) { + zero := int64(0) + background := metav1.DeletePropagationBackground + err := c.Core().Namespaces().Delete(name, &metav1.DeleteOptions{GracePeriodSeconds: &zero, PropagationPolicy: &background}) + if err != nil { + t.Fatalf("failed to delete namespace %q: %v", name, err) + } } // This test simulates the cascading deletion. func TestCascadingDeletion(t *testing.T) { - stopCh := make(chan struct{}) + ctx := setup(t, 5) + defer ctx.tearDown() - glog.V(6).Infof("TestCascadingDeletion starts") - defer glog.V(6).Infof("TestCascadingDeletion ends") - s, closeFn, gc, clientSet := setup(t, stopCh) - defer func() { - // We have to close the stop channel first, so the shared informers can terminate their watches; - // otherwise closeFn() will hang waiting for active client connections to finish. - close(stopCh) - closeFn() - }() + gc, clientSet := ctx.gc, ctx.clientSet - ns := framework.CreateTestingNamespace("gc-cascading-deletion", s, t) - defer framework.DeleteTestingNamespace(ns, s, t) + ns := createNamespaceOrDie("gc-cascading-deletion", clientSet, t) + defer deleteNamespaceOrDie(ns.Name, clientSet, t) rcClient := clientSet.Core().ReplicationControllers(ns.Name) podClient := clientSet.Core().Pods(ns.Name) @@ -234,7 +370,6 @@ func TestCascadingDeletion(t *testing.T) { if len(pods.Items) != 3 { t.Fatalf("Expect only 3 pods") } - go gc.Run(5, stopCh) // delete one of the replication controller if err := rcClient.Delete(toBeDeletedRCName, getNonOrphanOptions()); err != nil { t.Fatalf("failed to delete replication controller: %v", err) @@ -262,17 +397,13 @@ func TestCascadingDeletion(t *testing.T) { // This test simulates the case where an object is created with an owner that // doesn't exist. It verifies the GC will delete such an object. func TestCreateWithNonExistentOwner(t *testing.T) { - stopCh := make(chan struct{}) - s, closeFn, gc, clientSet := setup(t, stopCh) - defer func() { - // We have to close the stop channel first, so the shared informers can terminate their watches; - // otherwise closeFn() will hang waiting for active client connections to finish. - close(stopCh) - closeFn() - }() + ctx := setup(t, 5) + defer ctx.tearDown() - ns := framework.CreateTestingNamespace("gc-non-existing-owner", s, t) - defer framework.DeleteTestingNamespace(ns, s, t) + clientSet := ctx.clientSet + + ns := createNamespaceOrDie("gc-non-existing-owner", clientSet, t) + defer deleteNamespaceOrDie(ns.Name, clientSet, t) podClient := clientSet.Core().Pods(ns.Name) @@ -290,7 +421,6 @@ func TestCreateWithNonExistentOwner(t *testing.T) { if len(pods.Items) != 1 { t.Fatalf("Expect only 1 pod") } - go gc.Run(5, stopCh) // wait for the garbage collector to delete the pod if err := integration.WaitForPodToDisappear(podClient, garbageCollectedPodName, 5*time.Second, 30*time.Second); err != nil { t.Fatalf("expect pod %s to be garbage collected, got err= %v", garbageCollectedPodName, err) @@ -361,21 +491,13 @@ func verifyRemainingObjects(t *testing.T, clientSet clientset.Interface, namespa // time of our pre-submit tests to increase submit-queue throughput. We'll add // e2e tests that put more stress. func TestStressingCascadingDeletion(t *testing.T) { - t.Logf("starts garbage collector stress test") - stopCh := make(chan struct{}) - s, closeFn, gc, clientSet := setup(t, stopCh) + ctx := setup(t, 5) + defer ctx.tearDown() - defer func() { - // We have to close the stop channel first, so the shared informers can terminate their watches; - // otherwise closeFn() will hang waiting for active client connections to finish. - close(stopCh) - closeFn() - }() + gc, clientSet := ctx.gc, ctx.clientSet - ns := framework.CreateTestingNamespace("gc-stressing-cascading-deletion", s, t) - defer framework.DeleteTestingNamespace(ns, s, t) - - go gc.Run(5, stopCh) + ns := createNamespaceOrDie("gc-stressing-cascading-deletion", clientSet, t) + defer deleteNamespaceOrDie(ns.Name, clientSet, t) const collections = 10 var wg sync.WaitGroup @@ -428,18 +550,13 @@ func TestStressingCascadingDeletion(t *testing.T) { } func TestOrphaning(t *testing.T) { - stopCh := make(chan struct{}) - s, closeFn, gc, clientSet := setup(t, stopCh) + ctx := setup(t, 5) + defer ctx.tearDown() - defer func() { - // We have to close the stop channel first, so the shared informers can terminate their watches; - // otherwise closeFn() will hang waiting for active client connections to finish. - close(stopCh) - closeFn() - }() + gc, clientSet := ctx.gc, ctx.clientSet - ns := framework.CreateTestingNamespace("gc-orphaning", s, t) - defer framework.DeleteTestingNamespace(ns, s, t) + ns := createNamespaceOrDie("gc-orphaning", clientSet, t) + defer deleteNamespaceOrDie(ns.Name, clientSet, t) podClient := clientSet.Core().Pods(ns.Name) rcClient := clientSet.Core().ReplicationControllers(ns.Name) @@ -462,7 +579,6 @@ func TestOrphaning(t *testing.T) { } podUIDs = append(podUIDs, pod.ObjectMeta.UID) } - go gc.Run(5, stopCh) // we need wait for the gc to observe the creation of the pods, otherwise if // the deletion of RC is observed before the creation of the pods, the pods @@ -504,18 +620,13 @@ func TestOrphaning(t *testing.T) { } func TestSolidOwnerDoesNotBlockWaitingOwner(t *testing.T) { - stopCh := make(chan struct{}) - s, closeFn, gc, clientSet := setup(t, stopCh) + ctx := setup(t, 5) + defer ctx.tearDown() - defer func() { - // We have to close the stop channel first, so the shared informers can terminate their watches; - // otherwise closeFn() will hang waiting for active client connections to finish. - close(stopCh) - closeFn() - }() + clientSet := ctx.clientSet - ns := framework.CreateTestingNamespace("gc-foreground1", s, t) - defer framework.DeleteTestingNamespace(ns, s, t) + ns := createNamespaceOrDie("gc-foreground1", clientSet, t) + defer deleteNamespaceOrDie(ns.Name, clientSet, t) podClient := clientSet.Core().Pods(ns.Name) rcClient := clientSet.Core().ReplicationControllers(ns.Name) @@ -538,8 +649,6 @@ func TestSolidOwnerDoesNotBlockWaitingOwner(t *testing.T) { t.Fatalf("Failed to create Pod: %v", err) } - go gc.Run(5, stopCh) - err = rcClient.Delete(toBeDeletedRCName, getForegroundOptions()) if err != nil { t.Fatalf("Failed to delete the rc: %v", err) @@ -571,18 +680,13 @@ func TestSolidOwnerDoesNotBlockWaitingOwner(t *testing.T) { } func TestNonBlockingOwnerRefDoesNotBlock(t *testing.T) { - stopCh := make(chan struct{}) - s, closeFn, gc, clientSet := setup(t, stopCh) + ctx := setup(t, 5) + defer ctx.tearDown() - defer func() { - // We have to close the stop channel first, so the shared informers can terminate their watches; - // otherwise closeFn() will hang waiting for active client connections to finish. - close(stopCh) - closeFn() - }() + clientSet := ctx.clientSet - ns := framework.CreateTestingNamespace("gc-foreground2", s, t) - defer framework.DeleteTestingNamespace(ns, s, t) + ns := createNamespaceOrDie("gc-foreground2", clientSet, t) + defer deleteNamespaceOrDie(ns.Name, clientSet, t) podClient := clientSet.Core().Pods(ns.Name) rcClient := clientSet.Core().ReplicationControllers(ns.Name) @@ -613,8 +717,6 @@ func TestNonBlockingOwnerRefDoesNotBlock(t *testing.T) { t.Fatalf("Failed to create Pod: %v", err) } - go gc.Run(5, stopCh) - err = rcClient.Delete(toBeDeletedRCName, getForegroundOptions()) if err != nil { t.Fatalf("Failed to delete the rc: %v", err) @@ -644,18 +746,12 @@ func TestNonBlockingOwnerRefDoesNotBlock(t *testing.T) { } func TestBlockingOwnerRefDoesBlock(t *testing.T) { - stopCh := make(chan struct{}) - s, closeFn, gc, clientSet := setup(t, stopCh) + ctx := setup(t, 0) + defer ctx.tearDown() + gc, clientSet := ctx.gc, ctx.clientSet - defer func() { - // We have to close the stop channel first, so the shared informers can terminate their watches; - // otherwise closeFn() will hang waiting for active client connections to finish. - close(stopCh) - closeFn() - }() - - ns := framework.CreateTestingNamespace("gc-foreground3", s, t) - defer framework.DeleteTestingNamespace(ns, s, t) + ns := createNamespaceOrDie("foo", clientSet, t) + defer deleteNamespaceOrDie(ns.Name, clientSet, t) podClient := clientSet.Core().Pods(ns.Name) rcClient := clientSet.Core().ReplicationControllers(ns.Name) @@ -675,10 +771,9 @@ func TestBlockingOwnerRefDoesBlock(t *testing.T) { t.Fatalf("Failed to create Pod: %v", err) } - go gc.Run(5, stopCh) - // this makes sure the garbage collector will have added the pod to its // dependency graph before handling the foreground deletion of the rc. + ctx.startGC(5) timeout := make(chan struct{}) go func() { select { @@ -686,7 +781,7 @@ func TestBlockingOwnerRefDoesBlock(t *testing.T) { close(timeout) } }() - if !cache.WaitForCacheSync(timeout, gc.HasSynced) { + if !cache.WaitForCacheSync(timeout, gc.IsSynced) { t.Fatalf("failed to wait for garbage collector to be synced") } @@ -694,7 +789,7 @@ func TestBlockingOwnerRefDoesBlock(t *testing.T) { if err != nil { t.Fatalf("Failed to delete the rc: %v", err) } - time.Sleep(30 * time.Second) + time.Sleep(15 * time.Second) // verify the toBeDeleteRC is NOT deleted _, err = rcClient.Get(toBeDeletedRC.Name, metav1.GetOptions{}) if err != nil { @@ -710,3 +805,215 @@ func TestBlockingOwnerRefDoesBlock(t *testing.T) { t.Errorf("expect there to be 1 pods, got %#v", pods.Items) } } + +// TestCustomResourceCascadingDeletion ensures the basic cascading delete +// behavior supports custom resources. +func TestCustomResourceCascadingDeletion(t *testing.T) { + ctx := setup(t, 5) + defer ctx.tearDown() + + clientSet, apiExtensionClient, clientPool := ctx.clientSet, ctx.apiExtensionClient, ctx.clientPool + + ns := createNamespaceOrDie("crd-cascading", clientSet, t) + + definition, resourceClient := createRandomCustomResourceDefinition(t, apiExtensionClient, clientPool, ns.Name) + + // Create a custom owner resource. + owner := newCRDInstance(definition, ns.Name, names.SimpleNameGenerator.GenerateName("owner")) + owner, err := resourceClient.Create(owner) + if err != nil { + t.Fatalf("failed to create owner resource %q: %v", owner.GetName(), err) + } + t.Logf("created owner resource %q", owner.GetName()) + + // Create a custom dependent resource. + dependent := newCRDInstance(definition, ns.Name, names.SimpleNameGenerator.GenerateName("dependent")) + link(t, owner, dependent) + + dependent, err = resourceClient.Create(dependent) + if err != nil { + t.Fatalf("failed to create dependent resource %q: %v", dependent.GetName(), err) + } + t.Logf("created dependent resource %q", dependent.GetName()) + + // Delete the owner. + foreground := metav1.DeletePropagationForeground + err = resourceClient.Delete(owner.GetName(), &metav1.DeleteOptions{PropagationPolicy: &foreground}) + if err != nil { + t.Fatalf("failed to delete owner resource %q: %v", owner.GetName(), err) + } + + // Ensure the owner is deleted. + if err := wait.Poll(5*time.Second, 60*time.Second, func() (bool, error) { + _, err := resourceClient.Get(owner.GetName(), metav1.GetOptions{}) + return errors.IsNotFound(err), nil + }); err != nil { + t.Fatalf("failed waiting for owner resource %q to be deleted", owner.GetName()) + } + + // Ensure the dependent is deleted. + _, err = resourceClient.Get(dependent.GetName(), metav1.GetOptions{}) + if err == nil { + t.Fatalf("expected dependent %q to be deleted", dependent.GetName()) + } else { + if !errors.IsNotFound(err) { + t.Fatalf("unexpected error getting dependent %q: %v", dependent.GetName(), err) + } + } +} + +// TestMixedRelationships ensures that owner/dependent relationships work +// between core and custom resources. +// +// TODO: Consider how this could be represented with table-style tests (e.g. a +// before/after expected object graph given a delete operation targetting a +// specific node in the before graph with certain delete options). +func TestMixedRelationships(t *testing.T) { + ctx := setup(t, 5) + defer ctx.tearDown() + + clientSet, apiExtensionClient, clientPool := ctx.clientSet, ctx.apiExtensionClient, ctx.clientPool + + ns := createNamespaceOrDie("crd-mixed", clientSet, t) + + configMapClient := clientSet.Core().ConfigMaps(ns.Name) + + definition, resourceClient := createRandomCustomResourceDefinition(t, apiExtensionClient, clientPool, ns.Name) + + // Create a custom owner resource. + customOwner, err := resourceClient.Create(newCRDInstance(definition, ns.Name, names.SimpleNameGenerator.GenerateName("owner"))) + if err != nil { + t.Fatalf("failed to create owner: %v", err) + } + t.Logf("created custom owner %q", customOwner.GetName()) + + // Create a core dependent resource. + coreDependent := newConfigMap(ns.Name, names.SimpleNameGenerator.GenerateName("dependent")) + link(t, customOwner, coreDependent) + coreDependent, err = configMapClient.Create(coreDependent) + if err != nil { + t.Fatalf("failed to create dependent: %v", err) + } + t.Logf("created core dependent %q", coreDependent.GetName()) + + // Create a core owner resource. + coreOwner, err := configMapClient.Create(newConfigMap(ns.Name, names.SimpleNameGenerator.GenerateName("owner"))) + if err != nil { + t.Fatalf("failed to create owner: %v", err) + } + t.Logf("created core owner %q: %#v", coreOwner.GetName(), coreOwner) + + // Create a custom dependent resource. + customDependent := newCRDInstance(definition, ns.Name, names.SimpleNameGenerator.GenerateName("dependent")) + coreOwner.TypeMeta.Kind = "ConfigMap" + coreOwner.TypeMeta.APIVersion = "v1" + link(t, coreOwner, customDependent) + customDependent, err = resourceClient.Create(customDependent) + if err != nil { + t.Fatalf("failed to create dependent: %v", err) + } + t.Logf("created custom dependent %q", customDependent.GetName()) + + // Delete the custom owner. + foreground := metav1.DeletePropagationForeground + err = resourceClient.Delete(customOwner.GetName(), &metav1.DeleteOptions{PropagationPolicy: &foreground}) + if err != nil { + t.Fatalf("failed to delete owner resource %q: %v", customOwner.GetName(), err) + } + + // Ensure the owner is deleted. + if err := wait.Poll(5*time.Second, 60*time.Second, func() (bool, error) { + _, err := resourceClient.Get(customOwner.GetName(), metav1.GetOptions{}) + return errors.IsNotFound(err), nil + }); err != nil { + t.Fatalf("failed waiting for owner resource %q to be deleted", customOwner.GetName()) + } + + // Ensure the dependent is deleted. + _, err = resourceClient.Get(coreDependent.GetName(), metav1.GetOptions{}) + if err == nil { + t.Fatalf("expected dependent %q to be deleted", coreDependent.GetName()) + } else { + if !errors.IsNotFound(err) { + t.Fatalf("unexpected error getting dependent %q: %v", coreDependent.GetName(), err) + } + } + + // Delete the core owner. + err = configMapClient.Delete(coreOwner.GetName(), &metav1.DeleteOptions{PropagationPolicy: &foreground}) + if err != nil { + t.Fatalf("failed to delete owner resource %q: %v", coreOwner.GetName(), err) + } + + // Ensure the owner is deleted. + if err := wait.Poll(5*time.Second, 60*time.Second, func() (bool, error) { + _, err := configMapClient.Get(coreOwner.GetName(), metav1.GetOptions{}) + return errors.IsNotFound(err), nil + }); err != nil { + t.Fatalf("failed waiting for owner resource %q to be deleted", coreOwner.GetName()) + } + + // Ensure the dependent is deleted. + _, err = resourceClient.Get(customDependent.GetName(), metav1.GetOptions{}) + if err == nil { + t.Fatalf("expected dependent %q to be deleted", customDependent.GetName()) + } else { + if !errors.IsNotFound(err) { + t.Fatalf("unexpected error getting dependent %q: %v", customDependent.GetName(), err) + } + } +} + +// TestCRDDeletionCascading ensures propagating deletion of a custom resource +// definition with an instance that owns a core resource. +func TestCRDDeletionCascading(t *testing.T) { + ctx := setup(t, 5) + defer ctx.tearDown() + + clientSet, apiExtensionClient, clientPool := ctx.clientSet, ctx.apiExtensionClient, ctx.clientPool + + ns := createNamespaceOrDie("crd-mixed", clientSet, t) + + configMapClient := clientSet.Core().ConfigMaps(ns.Name) + + definition, resourceClient := createRandomCustomResourceDefinition(t, apiExtensionClient, clientPool, ns.Name) + + // Create a custom owner resource. + owner, err := resourceClient.Create(newCRDInstance(definition, ns.Name, names.SimpleNameGenerator.GenerateName("owner"))) + if err != nil { + t.Fatalf("failed to create owner: %v", err) + } + t.Logf("created owner %q", owner.GetName()) + + // Create a core dependent resource. + dependent := newConfigMap(ns.Name, names.SimpleNameGenerator.GenerateName("dependent")) + link(t, owner, dependent) + dependent, err = configMapClient.Create(dependent) + if err != nil { + t.Fatalf("failed to create dependent: %v", err) + } + t.Logf("created dependent %q", dependent.GetName()) + + time.Sleep(ctx.syncPeriod + 5*time.Second) + + // Delete the definition, which should cascade to the owner and ultimately its dependents. + if err := apiextensionstestserver.DeleteCustomResourceDefinition(definition, apiExtensionClient); err != nil { + t.Fatalf("failed to delete %q: %v", definition.Name, err) + } + + // Ensure the owner is deleted. + if err := wait.Poll(5*time.Second, 60*time.Second, func() (bool, error) { + _, err := resourceClient.Get(owner.GetName(), metav1.GetOptions{}) + return errors.IsNotFound(err), nil + }); err != nil { + t.Fatalf("failed waiting for owner %q to be deleted", owner.GetName()) + } + + // Ensure the dependent is deleted. + if err := wait.Poll(5*time.Second, 60*time.Second, func() (bool, error) { + _, err := configMapClient.Get(dependent.GetName(), metav1.GetOptions{}) + return errors.IsNotFound(err), nil + }); err != nil { + t.Fatalf("failed waiting for dependent %q (owned by %q) to be deleted", dependent.GetName(), owner.GetName()) + } +} diff --git a/test/integration/garbagecollector/main_test.go b/test/integration/garbagecollector/main_test.go deleted file mode 100644 index 1e2f4b243bc..00000000000 --- a/test/integration/garbagecollector/main_test.go +++ /dev/null @@ -1,27 +0,0 @@ -/* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package garbagecollector - -import ( - "testing" - - "k8s.io/kubernetes/test/integration/framework" -) - -func TestMain(m *testing.M) { - framework.EtcdMain(m.Run) -}