Merge pull request #41114 from ncdc/shared-informers-04-endpoints
Automatic merge from submit-queue (batch tested with PRs 39418, 41175, 40355, 41114, 32325) Switch endpoints controller to shared informers cc @bprashanth @thockin @deads2k @sttts @liggitt @smarterclayton @kubernetes/sig-scalability-pr-reviews
This commit is contained in:
		| @@ -46,7 +46,8 @@ import ( | ||||
|  | ||||
| func startEndpointController(ctx ControllerContext) (bool, error) { | ||||
| 	go endpointcontroller.NewEndpointController( | ||||
| 		ctx.InformerFactory.Pods().Informer(), | ||||
| 		ctx.NewInformerFactory.Core().V1().Pods(), | ||||
| 		ctx.NewInformerFactory.Core().V1().Services(), | ||||
| 		ctx.ClientBuilder.ClientOrDie("endpoint-controller"), | ||||
| 	).Run(int(ctx.Options.ConcurrentEndpointSyncs), ctx.Stop) | ||||
| 	return true, nil | ||||
|   | ||||
| @@ -404,7 +404,7 @@ function start_apiserver { | ||||
|     # NOTE: system:masters will be removed in the future | ||||
|     kube::util::create_client_certkey "${CONTROLPLANE_SUDO}" "${CERT_DIR}" 'client-ca' kubelet system:node:${HOSTNAME_OVERRIDE} system:nodes | ||||
|     kube::util::create_client_certkey "${CONTROLPLANE_SUDO}" "${CERT_DIR}" 'client-ca' kube-proxy system:kube-proxy system:nodes | ||||
|     kube::util::create_client_certkey "${CONTROLPLANE_SUDO}" "${CERT_DIR}" 'client-ca' controller system:controller system:masters | ||||
|     kube::util::create_client_certkey "${CONTROLPLANE_SUDO}" "${CERT_DIR}" 'client-ca' controller system:kube-controller-manager | ||||
|     kube::util::create_client_certkey "${CONTROLPLANE_SUDO}" "${CERT_DIR}" 'client-ca' scheduler system:scheduler system:masters | ||||
|     kube::util::create_client_certkey "${CONTROLPLANE_SUDO}" "${CERT_DIR}" 'client-ca' admin system:admin system:masters | ||||
|  | ||||
| @@ -488,6 +488,7 @@ function start_controller_manager { | ||||
|       --cloud-provider="${CLOUD_PROVIDER}" \ | ||||
|       --cloud-config="${CLOUD_CONFIG}" \ | ||||
|       --kubeconfig "$CERT_DIR"/controller.kubeconfig \ | ||||
|       --use-service-account-credentials \ | ||||
|       --master="https://${API_HOST}:${API_SECURE_PORT}" >"${CTLRMGR_LOG}" 2>&1 & | ||||
|     CTLRMGR_PID=$! | ||||
| } | ||||
|   | ||||
| @@ -20,19 +20,16 @@ go_library( | ||||
|         "//pkg/api/v1/endpoints:go_default_library", | ||||
|         "//pkg/api/v1/pod:go_default_library", | ||||
|         "//pkg/client/clientset_generated/clientset:go_default_library", | ||||
|         "//pkg/client/legacylisters:go_default_library", | ||||
|         "//pkg/controller:go_default_library", | ||||
|         "//pkg/controller/informers:go_default_library", | ||||
|         "//pkg/client/informers/informers_generated/core/v1:go_default_library", | ||||
|         "//pkg/client/listers/core/v1:go_default_library", | ||||
|         "//pkg/util/metrics:go_default_library", | ||||
|         "//vendor:github.com/golang/glog", | ||||
|         "//vendor:k8s.io/apimachinery/pkg/api/errors", | ||||
|         "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", | ||||
|         "//vendor:k8s.io/apimachinery/pkg/labels", | ||||
|         "//vendor:k8s.io/apimachinery/pkg/runtime", | ||||
|         "//vendor:k8s.io/apimachinery/pkg/util/runtime", | ||||
|         "//vendor:k8s.io/apimachinery/pkg/util/sets", | ||||
|         "//vendor:k8s.io/apimachinery/pkg/util/wait", | ||||
|         "//vendor:k8s.io/apimachinery/pkg/watch", | ||||
|         "//vendor:k8s.io/client-go/tools/cache", | ||||
|         "//vendor:k8s.io/client-go/util/workqueue", | ||||
|     ], | ||||
| @@ -49,6 +46,7 @@ go_test( | ||||
|         "//pkg/api/v1:go_default_library", | ||||
|         "//pkg/api/v1/endpoints:go_default_library", | ||||
|         "//pkg/client/clientset_generated/clientset:go_default_library", | ||||
|         "//pkg/client/informers/informers_generated:go_default_library", | ||||
|         "//pkg/controller:go_default_library", | ||||
|         "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", | ||||
|         "//vendor:k8s.io/apimachinery/pkg/runtime", | ||||
|   | ||||
| @@ -25,11 +25,9 @@ import ( | ||||
| 	"k8s.io/apimachinery/pkg/api/errors" | ||||
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||
| 	"k8s.io/apimachinery/pkg/labels" | ||||
| 	"k8s.io/apimachinery/pkg/runtime" | ||||
| 	utilruntime "k8s.io/apimachinery/pkg/util/runtime" | ||||
| 	"k8s.io/apimachinery/pkg/util/sets" | ||||
| 	"k8s.io/apimachinery/pkg/util/wait" | ||||
| 	"k8s.io/apimachinery/pkg/watch" | ||||
| 	"k8s.io/client-go/tools/cache" | ||||
| 	"k8s.io/client-go/util/workqueue" | ||||
| 	"k8s.io/kubernetes/pkg/api/v1" | ||||
| @@ -37,9 +35,8 @@ import ( | ||||
| 	podutil "k8s.io/kubernetes/pkg/api/v1/pod" | ||||
| 	utilpod "k8s.io/kubernetes/pkg/api/v1/pod" | ||||
| 	"k8s.io/kubernetes/pkg/client/clientset_generated/clientset" | ||||
| 	"k8s.io/kubernetes/pkg/client/legacylisters" | ||||
| 	"k8s.io/kubernetes/pkg/controller" | ||||
| 	"k8s.io/kubernetes/pkg/controller/informers" | ||||
| 	coreinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/core/v1" | ||||
| 	corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1" | ||||
| 	"k8s.io/kubernetes/pkg/util/metrics" | ||||
|  | ||||
| 	"github.com/golang/glog" | ||||
| @@ -51,10 +48,6 @@ const ( | ||||
| 	// shorter amount of time before a mistaken endpoint is corrected. | ||||
| 	FullServiceResyncPeriod = 30 * time.Second | ||||
|  | ||||
| 	// We must avoid syncing service until the pod store has synced. If it hasn't synced, to | ||||
| 	// avoid a hot loop, we'll wait this long between checks. | ||||
| 	PodStoreSyncedPollPeriod = 100 * time.Millisecond | ||||
|  | ||||
| 	// An annotation on the Service denoting if the endpoints controller should | ||||
| 	// go ahead and create endpoints for unready pods. This annotation is | ||||
| 	// currently only used by StatefulSets, where we need the pod to be DNS | ||||
| @@ -73,7 +66,7 @@ var ( | ||||
| ) | ||||
|  | ||||
| // NewEndpointController returns a new *EndpointController. | ||||
| func NewEndpointController(podInformer cache.SharedIndexInformer, client clientset.Interface) *EndpointController { | ||||
| func NewEndpointController(podInformer coreinformers.PodInformer, serviceInformer coreinformers.ServiceInformer, client clientset.Interface) *EndpointController { | ||||
| 	if client != nil && client.Core().RESTClient().GetRateLimiter() != nil { | ||||
| 		metrics.RegisterMetricAndTrackRateLimiterUsage("endpoint_controller", client.Core().RESTClient().GetRateLimiter()) | ||||
| 	} | ||||
| @@ -82,18 +75,7 @@ func NewEndpointController(podInformer cache.SharedIndexInformer, client clients | ||||
| 		queue:  workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "endpoint"), | ||||
| 	} | ||||
|  | ||||
| 	e.serviceStore.Indexer, e.serviceController = cache.NewIndexerInformer( | ||||
| 		&cache.ListWatch{ | ||||
| 			ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { | ||||
| 				return e.client.Core().Services(metav1.NamespaceAll).List(options) | ||||
| 			}, | ||||
| 			WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { | ||||
| 				return e.client.Core().Services(metav1.NamespaceAll).Watch(options) | ||||
| 			}, | ||||
| 		}, | ||||
| 		&v1.Service{}, | ||||
| 		// TODO: Can we have much longer period here? | ||||
| 		FullServiceResyncPeriod, | ||||
| 	serviceInformer.Informer().AddEventHandlerWithResyncPeriod( | ||||
| 		cache.ResourceEventHandlerFuncs{ | ||||
| 			AddFunc: e.enqueueService, | ||||
| 			UpdateFunc: func(old, cur interface{}) { | ||||
| @@ -101,26 +83,19 @@ func NewEndpointController(podInformer cache.SharedIndexInformer, client clients | ||||
| 			}, | ||||
| 			DeleteFunc: e.enqueueService, | ||||
| 		}, | ||||
| 		cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, | ||||
| 		// TODO: Can we have much longer period here? | ||||
| 		FullServiceResyncPeriod, | ||||
| 	) | ||||
| 	e.serviceLister = serviceInformer.Lister() | ||||
| 	e.servicesSynced = serviceInformer.Informer().HasSynced | ||||
|  | ||||
| 	podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ | ||||
| 	podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ | ||||
| 		AddFunc:    e.addPod, | ||||
| 		UpdateFunc: e.updatePod, | ||||
| 		DeleteFunc: e.deletePod, | ||||
| 	}) | ||||
| 	e.podStore.Indexer = podInformer.GetIndexer() | ||||
| 	e.podController = podInformer.GetController() | ||||
| 	e.podStoreSynced = podInformer.HasSynced | ||||
|  | ||||
| 	return e | ||||
| } | ||||
|  | ||||
| // NewEndpointControllerFromClient returns a new *EndpointController that runs its own informer. | ||||
| func NewEndpointControllerFromClient(client *clientset.Clientset, resyncPeriod controller.ResyncPeriodFunc) *EndpointController { | ||||
| 	podInformer := informers.NewPodInformer(client, resyncPeriod()) | ||||
| 	e := NewEndpointController(podInformer, client) | ||||
| 	e.internalPodInformer = podInformer | ||||
| 	e.podLister = podInformer.Lister() | ||||
| 	e.podsSynced = podInformer.Informer().HasSynced | ||||
|  | ||||
| 	return e | ||||
| } | ||||
| @@ -129,15 +104,19 @@ func NewEndpointControllerFromClient(client *clientset.Clientset, resyncPeriod c | ||||
| type EndpointController struct { | ||||
| 	client clientset.Interface | ||||
|  | ||||
| 	serviceStore listers.StoreToServiceLister | ||||
| 	podStore     listers.StoreToPodLister | ||||
| 	// serviceLister is able to list/get services and is populated by the shared informer passed to | ||||
| 	// NewEndpointController. | ||||
| 	serviceLister corelisters.ServiceLister | ||||
| 	// servicesSynced returns true if the service shared informer has been synced at least once. | ||||
| 	// Added as a member to the struct to allow injection for testing. | ||||
| 	servicesSynced cache.InformerSynced | ||||
|  | ||||
| 	// internalPodInformer is used to hold a personal informer.  If we're using | ||||
| 	// a normal shared informer, then the informer will be started for us.  If | ||||
| 	// we have a personal informer, we must start it ourselves.   If you start | ||||
| 	// the controller using NewEndpointController(passing SharedInformer), this | ||||
| 	// will be null | ||||
| 	internalPodInformer cache.SharedIndexInformer | ||||
| 	// podLister is able to list/get pods and is populated by the shared informer passed to | ||||
| 	// NewEndpointController. | ||||
| 	podLister corelisters.PodLister | ||||
| 	// podsSynced returns true if the pod shared informer has been synced at least once. | ||||
| 	// Added as a member to the struct to allow injection for testing. | ||||
| 	podsSynced cache.InformerSynced | ||||
|  | ||||
| 	// Services that need to be updated. A channel is inappropriate here, | ||||
| 	// because it allows services with lots of pods to be serviced much | ||||
| @@ -145,14 +124,6 @@ type EndpointController struct { | ||||
| 	// service that's inserted multiple times to be processed more than | ||||
| 	// necessary. | ||||
| 	queue workqueue.RateLimitingInterface | ||||
|  | ||||
| 	// Since we join two objects, we'll watch both of them with | ||||
| 	// controllers. | ||||
| 	serviceController cache.Controller | ||||
| 	podController     cache.Controller | ||||
| 	// podStoreSynced returns true if the pod store has been synced at least once. | ||||
| 	// Added as a member to the struct to allow injection for testing. | ||||
| 	podStoreSynced func() bool | ||||
| } | ||||
|  | ||||
| // Runs e; will not return until stopCh is closed. workers determines how many | ||||
| @@ -161,10 +132,8 @@ func (e *EndpointController) Run(workers int, stopCh <-chan struct{}) { | ||||
| 	defer utilruntime.HandleCrash() | ||||
| 	defer e.queue.ShutDown() | ||||
|  | ||||
| 	go e.serviceController.Run(stopCh) | ||||
| 	go e.podController.Run(stopCh) | ||||
|  | ||||
| 	if !cache.WaitForCacheSync(stopCh, e.podStoreSynced) { | ||||
| 	if !cache.WaitForCacheSync(stopCh, e.podsSynced, e.servicesSynced) { | ||||
| 		utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync")) | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| @@ -177,16 +146,12 @@ func (e *EndpointController) Run(workers int, stopCh <-chan struct{}) { | ||||
| 		e.checkLeftoverEndpoints() | ||||
| 	}() | ||||
|  | ||||
| 	if e.internalPodInformer != nil { | ||||
| 		go e.internalPodInformer.Run(stopCh) | ||||
| 	} | ||||
|  | ||||
| 	<-stopCh | ||||
| } | ||||
|  | ||||
| func (e *EndpointController) getPodServiceMemberships(pod *v1.Pod) (sets.String, error) { | ||||
| 	set := sets.String{} | ||||
| 	services, err := e.serviceStore.GetPodServices(pod) | ||||
| 	services, err := e.serviceLister.GetPodServices(pod) | ||||
| 	if err != nil { | ||||
| 		// don't log this error because this function makes pointless | ||||
| 		// errors when no services match. | ||||
| @@ -338,8 +303,12 @@ func (e *EndpointController) syncService(key string) error { | ||||
| 		glog.V(4).Infof("Finished syncing service %q endpoints. (%v)", key, time.Now().Sub(startTime)) | ||||
| 	}() | ||||
|  | ||||
| 	obj, exists, err := e.serviceStore.Indexer.GetByKey(key) | ||||
| 	if err != nil || !exists { | ||||
| 	namespace, name, err := cache.SplitMetaNamespaceKey(key) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	service, err := e.serviceLister.Services(namespace).Get(name) | ||||
| 	if err != nil { | ||||
| 		// Delete the corresponding endpoint, as the service has been deleted. | ||||
| 		// TODO: Please note that this will delete an endpoint when a | ||||
| 		// service is deleted. However, if we're down at the time when | ||||
| @@ -358,7 +327,6 @@ func (e *EndpointController) syncService(key string) error { | ||||
| 		return nil | ||||
| 	} | ||||
|  | ||||
| 	service := obj.(*v1.Service) | ||||
| 	if service.Spec.Selector == nil { | ||||
| 		// services without a selector receive no endpoints from this controller; | ||||
| 		// these services will receive the endpoints that are created out-of-band via the REST API. | ||||
| @@ -366,7 +334,7 @@ func (e *EndpointController) syncService(key string) error { | ||||
| 	} | ||||
|  | ||||
| 	glog.V(5).Infof("About to update endpoints for service %q", key) | ||||
| 	pods, err := e.podStore.Pods(service.Namespace).List(labels.Set(service.Spec.Selector).AsSelectorPreValidated()) | ||||
| 	pods, err := e.podLister.Pods(service.Namespace).List(labels.Set(service.Spec.Selector).AsSelectorPreValidated()) | ||||
| 	if err != nil { | ||||
| 		// Since we're getting stuff from a local cache, it is | ||||
| 		// basically impossible to get this error. | ||||
|   | ||||
| @@ -33,6 +33,7 @@ import ( | ||||
| 	"k8s.io/kubernetes/pkg/api/v1" | ||||
| 	endptspkg "k8s.io/kubernetes/pkg/api/v1/endpoints" | ||||
| 	"k8s.io/kubernetes/pkg/client/clientset_generated/clientset" | ||||
| 	informers "k8s.io/kubernetes/pkg/client/informers/informers_generated" | ||||
| 	"k8s.io/kubernetes/pkg/controller" | ||||
| ) | ||||
|  | ||||
| @@ -92,6 +93,25 @@ func makeTestServer(t *testing.T, namespace string, endpointsResponse serverResp | ||||
| 	return httptest.NewServer(mux), &fakeEndpointsHandler | ||||
| } | ||||
|  | ||||
| type endpointController struct { | ||||
| 	*EndpointController | ||||
| 	podStore     cache.Store | ||||
| 	serviceStore cache.Store | ||||
| } | ||||
|  | ||||
| func newController(url string) *endpointController { | ||||
| 	client := clientset.NewForConfigOrDie(&restclient.Config{Host: url, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) | ||||
| 	informerFactory := informers.NewSharedInformerFactory(nil, client, controller.NoResyncPeriodFunc()) | ||||
| 	endpoints := NewEndpointController(informerFactory.Core().V1().Pods(), informerFactory.Core().V1().Services(), client) | ||||
| 	endpoints.podsSynced = alwaysReady | ||||
| 	endpoints.servicesSynced = alwaysReady | ||||
| 	return &endpointController{ | ||||
| 		endpoints, | ||||
| 		informerFactory.Core().V1().Pods().Informer().GetStore(), | ||||
| 		informerFactory.Core().V1().Services().Informer().GetStore(), | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) { | ||||
| 	ns := metav1.NamespaceDefault | ||||
| 	testServer, endpointsHandler := makeTestServer(t, ns, | ||||
| @@ -107,10 +127,8 @@ func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) { | ||||
| 			}}, | ||||
| 		}}) | ||||
| 	defer testServer.Close() | ||||
| 	client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) | ||||
| 	endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc) | ||||
| 	endpoints.podStoreSynced = alwaysReady | ||||
| 	endpoints.serviceStore.Indexer.Add(&v1.Service{ | ||||
| 	endpoints := newController(testServer.URL) | ||||
| 	endpoints.serviceStore.Add(&v1.Service{ | ||||
| 		ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, | ||||
| 		Spec:       v1.ServiceSpec{Ports: []v1.ServicePort{{Port: 80}}}, | ||||
| 	}) | ||||
| @@ -140,9 +158,7 @@ func TestCheckLeftoverEndpoints(t *testing.T) { | ||||
| 			}}, | ||||
| 		}}) | ||||
| 	defer testServer.Close() | ||||
| 	client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) | ||||
| 	endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc) | ||||
| 	endpoints.podStoreSynced = alwaysReady | ||||
| 	endpoints := newController(testServer.URL) | ||||
| 	endpoints.checkLeftoverEndpoints() | ||||
|  | ||||
| 	if e, a := 1, endpoints.queue.Len(); e != a { | ||||
| @@ -169,12 +185,10 @@ func TestSyncEndpointsProtocolTCP(t *testing.T) { | ||||
| 			}}, | ||||
| 		}}) | ||||
| 	defer testServer.Close() | ||||
| 	client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) | ||||
| 	endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc) | ||||
| 	endpoints.podStoreSynced = alwaysReady | ||||
| 	endpoints := newController(testServer.URL) | ||||
|  | ||||
| 	addPods(endpoints.podStore.Indexer, ns, 1, 1, 0) | ||||
| 	endpoints.serviceStore.Indexer.Add(&v1.Service{ | ||||
| 	addPods(endpoints.podStore, ns, 1, 1, 0) | ||||
| 	endpoints.serviceStore.Add(&v1.Service{ | ||||
| 		ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, | ||||
| 		Spec: v1.ServiceSpec{ | ||||
| 			Selector: map[string]string{}, | ||||
| @@ -212,11 +226,9 @@ func TestSyncEndpointsProtocolUDP(t *testing.T) { | ||||
| 			}}, | ||||
| 		}}) | ||||
| 	defer testServer.Close() | ||||
| 	client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) | ||||
| 	endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc) | ||||
| 	endpoints.podStoreSynced = alwaysReady | ||||
| 	addPods(endpoints.podStore.Indexer, ns, 1, 1, 0) | ||||
| 	endpoints.serviceStore.Indexer.Add(&v1.Service{ | ||||
| 	endpoints := newController(testServer.URL) | ||||
| 	addPods(endpoints.podStore, ns, 1, 1, 0) | ||||
| 	endpoints.serviceStore.Add(&v1.Service{ | ||||
| 		ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, | ||||
| 		Spec: v1.ServiceSpec{ | ||||
| 			Selector: map[string]string{}, | ||||
| @@ -251,11 +263,9 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) { | ||||
| 			Subsets: []v1.EndpointSubset{}, | ||||
| 		}}) | ||||
| 	defer testServer.Close() | ||||
| 	client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) | ||||
| 	endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc) | ||||
| 	endpoints.podStoreSynced = alwaysReady | ||||
| 	addPods(endpoints.podStore.Indexer, ns, 1, 1, 0) | ||||
| 	endpoints.serviceStore.Indexer.Add(&v1.Service{ | ||||
| 	endpoints := newController(testServer.URL) | ||||
| 	addPods(endpoints.podStore, ns, 1, 1, 0) | ||||
| 	endpoints.serviceStore.Add(&v1.Service{ | ||||
| 		ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, | ||||
| 		Spec: v1.ServiceSpec{ | ||||
| 			Selector: map[string]string{}, | ||||
| @@ -289,11 +299,9 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllNotReady(t *testing.T) { | ||||
| 			Subsets: []v1.EndpointSubset{}, | ||||
| 		}}) | ||||
| 	defer testServer.Close() | ||||
| 	client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) | ||||
| 	endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc) | ||||
| 	endpoints.podStoreSynced = alwaysReady | ||||
| 	addPods(endpoints.podStore.Indexer, ns, 0, 1, 1) | ||||
| 	endpoints.serviceStore.Indexer.Add(&v1.Service{ | ||||
| 	endpoints := newController(testServer.URL) | ||||
| 	addPods(endpoints.podStore, ns, 0, 1, 1) | ||||
| 	endpoints.serviceStore.Add(&v1.Service{ | ||||
| 		ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, | ||||
| 		Spec: v1.ServiceSpec{ | ||||
| 			Selector: map[string]string{}, | ||||
| @@ -327,11 +335,9 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllMixed(t *testing.T) { | ||||
| 			Subsets: []v1.EndpointSubset{}, | ||||
| 		}}) | ||||
| 	defer testServer.Close() | ||||
| 	client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) | ||||
| 	endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc) | ||||
| 	endpoints.podStoreSynced = alwaysReady | ||||
| 	addPods(endpoints.podStore.Indexer, ns, 1, 1, 1) | ||||
| 	endpoints.serviceStore.Indexer.Add(&v1.Service{ | ||||
| 	endpoints := newController(testServer.URL) | ||||
| 	addPods(endpoints.podStore, ns, 1, 1, 1) | ||||
| 	endpoints.serviceStore.Add(&v1.Service{ | ||||
| 		ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, | ||||
| 		Spec: v1.ServiceSpec{ | ||||
| 			Selector: map[string]string{}, | ||||
| @@ -369,11 +375,9 @@ func TestSyncEndpointsItemsPreexisting(t *testing.T) { | ||||
| 			}}, | ||||
| 		}}) | ||||
| 	defer testServer.Close() | ||||
| 	client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) | ||||
| 	endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc) | ||||
| 	endpoints.podStoreSynced = alwaysReady | ||||
| 	addPods(endpoints.podStore.Indexer, ns, 1, 1, 0) | ||||
| 	endpoints.serviceStore.Indexer.Add(&v1.Service{ | ||||
| 	endpoints := newController(testServer.URL) | ||||
| 	addPods(endpoints.podStore, ns, 1, 1, 0) | ||||
| 	endpoints.serviceStore.Add(&v1.Service{ | ||||
| 		ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, | ||||
| 		Spec: v1.ServiceSpec{ | ||||
| 			Selector: map[string]string{"foo": "bar"}, | ||||
| @@ -410,11 +414,9 @@ func TestSyncEndpointsItemsPreexistingIdentical(t *testing.T) { | ||||
| 			}}, | ||||
| 		}}) | ||||
| 	defer testServer.Close() | ||||
| 	client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) | ||||
| 	endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc) | ||||
| 	endpoints.podStoreSynced = alwaysReady | ||||
| 	addPods(endpoints.podStore.Indexer, metav1.NamespaceDefault, 1, 1, 0) | ||||
| 	endpoints.serviceStore.Indexer.Add(&v1.Service{ | ||||
| 	endpoints := newController(testServer.URL) | ||||
| 	addPods(endpoints.podStore, metav1.NamespaceDefault, 1, 1, 0) | ||||
| 	endpoints.serviceStore.Add(&v1.Service{ | ||||
| 		ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: metav1.NamespaceDefault}, | ||||
| 		Spec: v1.ServiceSpec{ | ||||
| 			Selector: map[string]string{"foo": "bar"}, | ||||
| @@ -430,12 +432,10 @@ func TestSyncEndpointsItems(t *testing.T) { | ||||
| 	testServer, endpointsHandler := makeTestServer(t, ns, | ||||
| 		serverResponse{http.StatusOK, &v1.Endpoints{}}) | ||||
| 	defer testServer.Close() | ||||
| 	client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) | ||||
| 	endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc) | ||||
| 	endpoints.podStoreSynced = alwaysReady | ||||
| 	addPods(endpoints.podStore.Indexer, ns, 3, 2, 0) | ||||
| 	addPods(endpoints.podStore.Indexer, "blah", 5, 2, 0) // make sure these aren't found! | ||||
| 	endpoints.serviceStore.Indexer.Add(&v1.Service{ | ||||
| 	endpoints := newController(testServer.URL) | ||||
| 	addPods(endpoints.podStore, ns, 3, 2, 0) | ||||
| 	addPods(endpoints.podStore, "blah", 5, 2, 0) // make sure these aren't found! | ||||
| 	endpoints.serviceStore.Add(&v1.Service{ | ||||
| 		ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, | ||||
| 		Spec: v1.ServiceSpec{ | ||||
| 			Selector: map[string]string{"foo": "bar"}, | ||||
| @@ -473,12 +473,10 @@ func TestSyncEndpointsItemsWithLabels(t *testing.T) { | ||||
| 	testServer, endpointsHandler := makeTestServer(t, ns, | ||||
| 		serverResponse{http.StatusOK, &v1.Endpoints{}}) | ||||
| 	defer testServer.Close() | ||||
| 	client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) | ||||
| 	endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc) | ||||
| 	endpoints.podStoreSynced = alwaysReady | ||||
| 	addPods(endpoints.podStore.Indexer, ns, 3, 2, 0) | ||||
| 	endpoints := newController(testServer.URL) | ||||
| 	addPods(endpoints.podStore, ns, 3, 2, 0) | ||||
| 	serviceLabels := map[string]string{"foo": "bar"} | ||||
| 	endpoints.serviceStore.Indexer.Add(&v1.Service{ | ||||
| 	endpoints.serviceStore.Add(&v1.Service{ | ||||
| 		ObjectMeta: metav1.ObjectMeta{ | ||||
| 			Name:      "foo", | ||||
| 			Namespace: ns, | ||||
| @@ -534,12 +532,10 @@ func TestSyncEndpointsItemsPreexistingLabelsChange(t *testing.T) { | ||||
| 			}}, | ||||
| 		}}) | ||||
| 	defer testServer.Close() | ||||
| 	client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) | ||||
| 	endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc) | ||||
| 	endpoints.podStoreSynced = alwaysReady | ||||
| 	addPods(endpoints.podStore.Indexer, ns, 1, 1, 0) | ||||
| 	endpoints := newController(testServer.URL) | ||||
| 	addPods(endpoints.podStore, ns, 1, 1, 0) | ||||
| 	serviceLabels := map[string]string{"baz": "blah"} | ||||
| 	endpoints.serviceStore.Indexer.Add(&v1.Service{ | ||||
| 	endpoints.serviceStore.Add(&v1.Service{ | ||||
| 		ObjectMeta: metav1.ObjectMeta{ | ||||
| 			Name:      "foo", | ||||
| 			Namespace: ns, | ||||
|   | ||||
| @@ -280,7 +280,7 @@ func ClusterRoles() []rbac.ClusterRole { | ||||
| 				rbac.NewRule("update").Groups(legacyGroup).Resources("endpoints", "serviceaccounts").RuleOrDie(), | ||||
|  | ||||
| 				rbac.NewRule("list", "watch").Groups("*").Resources("namespaces", "nodes", "persistentvolumeclaims", | ||||
| 					"persistentvolumes", "pods", "secrets", "serviceaccounts", "replicationcontrollers").RuleOrDie(), | ||||
| 					"persistentvolumes", "pods", "secrets", "services", "serviceaccounts", "replicationcontrollers").RuleOrDie(), | ||||
| 				rbac.NewRule("list", "watch").Groups(extensionsGroup).Resources("daemonsets", "deployments", "replicasets").RuleOrDie(), | ||||
| 				rbac.NewRule("list", "watch").Groups(batchGroup).Resources("jobs", "cronjobs").RuleOrDie(), | ||||
| 			}, | ||||
|   | ||||
| @@ -454,6 +454,7 @@ items: | ||||
|     - replicationcontrollers | ||||
|     - secrets | ||||
|     - serviceaccounts | ||||
|     - services | ||||
|     verbs: | ||||
|     - list | ||||
|     - watch | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Kubernetes Submit Queue
					Kubernetes Submit Queue